Spring Boot 3 集成 RabbitMQ 實(shí)踐指南(原理解析)
Spring Boot 3 集成 RabbitMQ 實(shí)踐指南
1. RabbitMQ 核心原理
1.1 什么是RabbitMQ
RabbitMQ是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,使用Erlang語(yǔ)言開發(fā),基于AMQP(Advanced Message Queuing Protocol)協(xié)議實(shí)現(xiàn)。它支持多種消息傳遞模式,具有高可用性、可擴(kuò)展性和可靠性等特點(diǎn)。
1.2 核心概念
1.2.1 基礎(chǔ)組件
Producer(生產(chǎn)者)
- 消息的發(fā)送者
- 負(fù)責(zé)創(chuàng)建消息并發(fā)布到RabbitMQ中
Consumer(消費(fèi)者)
- 消息的接收者
- 連接到RabbitMQ服務(wù)器并訂閱隊(duì)列
Exchange(交換機(jī))
- 接收生產(chǎn)者發(fā)送的消息并根據(jù)路由規(guī)則轉(zhuǎn)發(fā)到隊(duì)列
- 類型:
- Direct Exchange:根據(jù)routing key精確匹配
- Topic Exchange:根據(jù)routing key模式匹配
- Fanout Exchange:廣播到所有綁定隊(duì)列
- Headers Exchange:根據(jù)消息屬性匹配
Queue(隊(duì)列)
- 消息存儲(chǔ)的地方
- 支持持久化、臨時(shí)、自動(dòng)刪除等特性
Binding(綁定)
- 交換機(jī)和隊(duì)列之間的虛擬連接
- 定義消息路由規(guī)則
1.2.2 高級(jí)特性
消息持久化
- 交換機(jī)持久化:創(chuàng)建時(shí)設(shè)置durable=true
- 隊(duì)列持久化:創(chuàng)建時(shí)設(shè)置durable=true
- 消息持久化:設(shè)置delivery-mode=2
消息確認(rèn)機(jī)制
- 生產(chǎn)者確認(rèn):Publisher Confirm和Return機(jī)制
- 消費(fèi)者確認(rèn):自動(dòng)確認(rèn)、手動(dòng)確認(rèn)、批量確認(rèn)
死信隊(duì)列(DLX)
- 消息被拒絕且不重新入隊(duì)
- 消息過期(TTL)
- 隊(duì)列達(dá)到最大長(zhǎng)度
1.3 應(yīng)用場(chǎng)景
異步處理
- 發(fā)送郵件、短信通知
- 日志處理、報(bào)表生成
- 文件處理、圖片處理
應(yīng)用解耦
- 系統(tǒng)間通信
- 服務(wù)解耦
- 流程分離
流量控制
- 削峰填谷
- 請(qǐng)求緩沖
- 流量整形
定時(shí)任務(wù)
- 延遲隊(duì)列
- 定時(shí)處理
- 任務(wù)調(diào)度
2. 環(huán)境搭建
2.1 基礎(chǔ)環(huán)境
- Spring Boot: 3.x
- Java: 17+
- RabbitMQ: 3.12+
- Maven/Gradle
2.2 依賴配置
<dependencies>
<!-- Spring Boot Starter AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>2.3 基礎(chǔ)配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 消息確認(rèn)配置
publisher-confirm-type: correlated # 開啟發(fā)布確認(rèn)
publisher-returns: true # 開啟發(fā)布返回
template:
mandatory: true # 消息路由失敗返回
# 消費(fèi)者配置
listener:
simple:
acknowledge-mode: manual # 手動(dòng)確認(rèn)
prefetch: 1 # 每次獲取消息數(shù)量
retry:
enabled: true # 開啟重試
initial-interval: 1000 # 重試間隔時(shí)間
max-attempts: 3 # 最大重試次數(shù)
multiplier: 1.0 # 重試時(shí)間乘數(shù)
# SSL配置(可選)
ssl:
enabled: false
key-store: classpath:keystore.p12
key-store-password: password
trust-store: classpath:truststore.p12
trust-store-password: password3. 核心配置類
3.1 RabbitMQ配置類
@Configuration
@EnableRabbit
public class RabbitMQConfig {
// 交換機(jī)名稱
public static final String BUSINESS_EXCHANGE = "business.exchange";
public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
// 隊(duì)列名稱
public static final String BUSINESS_QUEUE = "business.queue";
public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
// 路由鍵
public static final String BUSINESS_KEY = "business.key";
public static final String DEAD_LETTER_KEY = "dead.letter.key";
// 業(yè)務(wù)交換機(jī)
@Bean
public DirectExchange businessExchange() {
return ExchangeBuilder.directExchange(BUSINESS_EXCHANGE)
.durable(true)
.build();
}
// 死信交換機(jī)
@Bean
public DirectExchange deadLetterExchange() {
return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE)
.durable(true)
.build();
}
// 業(yè)務(wù)隊(duì)列
@Bean
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>(3);
// 消息過期時(shí)間
args.put("x-message-ttl", 60000);
// 隊(duì)列最大長(zhǎng)度
args.put("x-max-length", 1000);
// 死信交換機(jī)
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY);
return QueueBuilder.durable(BUSINESS_QUEUE)
.withArguments(args)
.build();
}
// 死信隊(duì)列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
// 業(yè)務(wù)綁定
@Bean
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue())
.to(businessExchange())
.with(BUSINESS_KEY);
}
// 死信綁定
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with(DEAD_LETTER_KEY);
}
// 消息轉(zhuǎn)換器
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
// RabbitTemplate配置
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
}3.2 消息確認(rèn)配置
@Configuration
@Slf4j
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息發(fā)送到交換機(jī)成功: correlationData={}", correlationData);
} else {
log.error("消息發(fā)送到交換機(jī)失敗: correlationData={}, cause={}", correlationData, cause);
// 處理失敗邏輯,如重試、告警等
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("消息路由到隊(duì)列失敗: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
returned.getExchange(),
returned.getRoutingKey(),
returned.getReplyCode(),
returned.getReplyText(),
new String(returned.getMessage().getBody()));
// 處理失敗邏輯,如重試、告警等
}
}4. 消息生產(chǎn)者
4.1 消息發(fā)送服務(wù)
@Service
@Slf4j
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(Object message, String exchange, String routingKey) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
log.info("消息發(fā)送成功: message={}, exchange={}, routingKey={}, correlationData={}",
message, exchange, routingKey, correlationData);
} catch (Exception e) {
log.error("消息發(fā)送異常: message={}, exchange={}, routingKey={}, correlationData={}, error={}",
message, exchange, routingKey, correlationData, e.getMessage());
throw new RuntimeException("消息發(fā)送失敗", e);
}
}
public void sendDelayMessage(Object message, String exchange, String routingKey, long delayMillis) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessagePostProcessor messagePostProcessor = msg -> {
msg.getMessageProperties().setDelay((int) delayMillis);
return msg;
};
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
log.info("延遲消息發(fā)送成功: message={}, exchange={}, routingKey={}, delay={}, correlationData={}",
message, exchange, routingKey, delayMillis, correlationData);
} catch (Exception e) {
log.error("延遲消息發(fā)送異常: message={}, exchange={}, routingKey={}, delay={}, correlationData={}, error={}",
message, exchange, routingKey, delayMillis, correlationData, e.getMessage());
throw new RuntimeException("延遲消息發(fā)送失敗", e);
}
}
}5. 消息消費(fèi)者
5.1 消息處理服務(wù)
@Service
@Slf4j
public class MessageConsumer {
@RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE)
public void handleMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 獲取消息內(nèi)容
String messageBody = new String(message.getBody());
log.info("收到消息: message={}, deliveryTag={}", messageBody, deliveryTag);
// 業(yè)務(wù)處理
processMessage(messageBody);
// 手動(dòng)確認(rèn)消息
channel.basicAck(deliveryTag, false);
log.info("消息處理成功: deliveryTag={}", deliveryTag);
} catch (Exception e) {
log.error("消息處理異常: deliveryTag={}, error={}", deliveryTag, e.getMessage());
// 判斷是否重新投遞
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重試,拒絕消息: deliveryTag={}", deliveryTag);
channel.basicReject(deliveryTag, false);
} else {
log.info("消息首次處理失敗,重新投遞: deliveryTag={}", deliveryTag);
channel.basicNack(deliveryTag, false, true);
}
}
}
private void processMessage(String message) {
// 實(shí)現(xiàn)具體的業(yè)務(wù)邏輯
log.info("處理消息: {}", message);
}
}5.2 死信消息處理
@Service
@Slf4j
public class DeadLetterConsumer {
@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
public void handleDeadLetter(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String messageBody = new String(message.getBody());
log.info("收到死信消息: message={}, deliveryTag={}", messageBody, deliveryTag);
// 死信消息處理邏輯
processDeadLetter(messageBody);
channel.basicAck(deliveryTag, false);
log.info("死信消息處理成功: deliveryTag={}", deliveryTag);
} catch (Exception e) {
log.error("死信消息處理異常: deliveryTag={}, error={}", deliveryTag, e.getMessage());
channel.basicReject(deliveryTag, false);
}
}
private void processDeadLetter(String message) {
// 實(shí)現(xiàn)死信消息處理邏輯
log.info("處理死信消息: {}", message);
}
}6. 接口控制器
@RestController
@RequestMapping("/api/mq")
@Slf4j
public class MessageController {
@Autowired
private MessageProducer messageProducer;
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody MessageDTO message) {
try {
messageProducer.sendMessage(message.getContent(),
RabbitMQConfig.BUSINESS_EXCHANGE,
RabbitMQConfig.BUSINESS_KEY);
return ResponseEntity.ok("消息發(fā)送成功");
} catch (Exception e) {
log.error("消息發(fā)送失敗: {}", e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("消息發(fā)送失敗: " + e.getMessage());
}
}
@PostMapping("/send/delay")
public ResponseEntity<String> sendDelayMessage(
@RequestBody MessageDTO message,
@RequestParam long delayMillis) {
try {
messageProducer.sendDelayMessage(message.getContent(),
RabbitMQConfig.BUSINESS_EXCHANGE,
RabbitMQConfig.BUSINESS_KEY,
delayMillis);
return ResponseEntity.ok("延遲消息發(fā)送成功");
} catch (Exception e) {
log.error("延遲消息發(fā)送失敗: {}", e.getMessage());
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("延遲消息發(fā)送失敗: " + e.getMessage());
}
}
}7. 監(jiān)控與運(yùn)維
7.1 RabbitMQ管理界面
- 訪問地址:http://localhost:15672
- 默認(rèn)賬號(hào):guest/guest
- 主要功能:
- 隊(duì)列監(jiān)控
- 交換機(jī)管理
- 連接狀態(tài)
- 消息追蹤
7.2 Prometheus + Grafana監(jiān)控
# prometheus.yml
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets: ['localhost:15692']7.3 日志配置
logging:
level:
org.springframework.amqp: INFO
com.your.package: DEBUG
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"7.4 告警配置
@Configuration
public class RabbitMQAlertConfig {
@Value("${alert.dingtalk.webhook}")
private String webhookUrl;
@Bean
public AlertService alertService() {
return new DingTalkAlertService(webhookUrl);
}
}8. 最佳實(shí)踐
8.1 消息冪等性處理
@Service
public class MessageIdempotentHandler {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public boolean isProcessed(String messageId) {
String key = "mq:processed:" + messageId;
return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS));
}
}8.2 消息重試策略
@Configuration
public class RetryConfig {
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
}8.3 消息序列化
@Configuration
public class MessageConverterConfig {
@Bean
public MessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setCreateMessageIds(true);
return converter;
}
}8.4 消息追蹤
@Aspect
@Component
@Slf4j
public class MessageTraceAspect {
@Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
public Object traceMessage(ProceedingJoinPoint joinPoint) throws Throwable {
String messageId = MDC.get("messageId");
log.info("開始處理消息: messageId={}", messageId);
try {
Object result = joinPoint.proceed();
log.info("消息處理完成: messageId={}", messageId);
return result;
} catch (Exception e) {
log.error("消息處理異常: messageId={}, error={}", messageId, e.getMessage());
throw e;
}
}
}9. 常見問題與解決方案
9.1 消息丟失問題
- 生產(chǎn)者確認(rèn)機(jī)制
- 消息持久化
- 手動(dòng)確認(rèn)模式
- 集群高可用
9.2 消息重復(fù)消費(fèi)
- 冪等性處理
- 消息去重
- 業(yè)務(wù)檢查
9.3 消息堆積問題
- 增加消費(fèi)者數(shù)量
- 提高處理效率
- 隊(duì)列分片
- 死信隊(duì)列處理
9.4 性能優(yōu)化
- 合理設(shè)置預(yù)取數(shù)量
- 批量確認(rèn)消息
- 消息壓縮
- 連接池優(yōu)化
10. 高可用部署
10.1 集群配置
spring:
rabbitmq:
addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672
username: admin
password: password
virtual-host: /10.2 鏡像隊(duì)列
# 設(shè)置鏡像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'10.3 負(fù)載均衡
# nginx.conf
upstream rabbitmq_cluster {
server rabbit1:15672;
server rabbit2:15672;
server rabbit3:15672;
}11. 參考資源
到此這篇關(guān)于Spring Boot 3 集成 RabbitMQ 實(shí)踐指南的文章就介紹到這了,更多相關(guān)Spring Boot 3 集成 RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
- SpringAMQP消息隊(duì)列(SpringBoot集成RabbitMQ方式)
- 一文掌握Springboot集成RabbitMQ的方法
- springboot2.5.6集成RabbitMq實(shí)現(xiàn)Topic主題模式(推薦)
- Springboot集成RabbitMQ死信隊(duì)列的實(shí)現(xiàn)
- SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
- springboot2.0集成rabbitmq的示例代碼
- Spring Boot系列教程之7步集成RabbitMQ的方法
- springboot集成rabbitMQ之對(duì)象傳輸?shù)姆椒?/a>
- spring boot集成rabbitmq的實(shí)例教程
- 詳解spring boot集成RabbitMQ
相關(guān)文章
Springboot使用put、delete請(qǐng)求報(bào)錯(cuò)405的處理
這篇文章主要介紹了Springboot使用put、delete請(qǐng)求報(bào)錯(cuò)405的處理方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
淺談Java數(shù)據(jù)結(jié)構(gòu)之稀疏數(shù)組知識(shí)總結(jié)
今天帶大家了解一下Java稀疏數(shù)組的相關(guān)知識(shí),文中有非常詳細(xì)的介紹及代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有很好地幫助,需要的朋友可以參考下2021-05-05
SpringMVC使用MultipartResolver實(shí)現(xiàn)文件上傳
MultipartResolver 用于處理文件上傳,當(dāng)收到請(qǐng)求時(shí) DispatcherServlet 的 checkMultipart() 方法會(huì)調(diào)用 MultipartResolver 的 isMultipart() 方法判斷請(qǐng)求中是否包含文件2023-02-02
springSecurity自定義登錄接口和JWT認(rèn)證過濾器的流程
這篇文章主要介紹了springSecurity自定義登陸接口和JWT認(rèn)證過濾器的相關(guān)資料,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-12-12
Java JDK動(dòng)態(tài)代理實(shí)現(xiàn)原理實(shí)例解析
這篇文章主要介紹了Java JDK動(dòng)態(tài)代理實(shí)現(xiàn)原理實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06
spring boot攔截器的使用場(chǎng)景示例詳解
這篇文章主要給大家介紹了關(guān)于spring boot攔截器的使用場(chǎng)景,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05
SpringBoot中的maven插件spring-boot-maven-plugin使用
這篇文章主要介紹了SpringBoot中的maven插件spring-boot-maven-plugin使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12
靈活控制任務(wù)執(zhí)行時(shí)間的Cron表達(dá)式范例
這篇文章主要為大家介紹了靈活控制任務(wù)執(zhí)行時(shí)間的Cron表達(dá)式范例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10

