Spring boot框架下的RabbitMQ消息中間件詳解
1. RabbitMQ 基礎(chǔ)概念
1.1 消息處理流程與組件配合
Producer(生產(chǎn)者) 發(fā)送消息。消息先發(fā)送到 Exchange(交換機),而不是直接到隊列。
- Exchange(交換機) 接收到消息后,根據(jù) Routing Key(路由鍵) 和 Binding(綁定規(guī)則),決定將消息發(fā)送到哪些 Queue(隊列)。
- Queue(隊列) 存儲消息,等待 Consumer(消費者) 消費。
- Consumer(消費者) 從隊列中接收并處理消息。
Producer(生產(chǎn)者)
作用:負(fù)責(zé)發(fā)送消息到 RabbitMQ 的入口,指定消息的 Exchange 和 Routing Key。
關(guān)鍵點:
- Producer 只需要知道 Exchange 和 Routing Key,不關(guān)心隊列。
- Producer 不直接與隊列交互,消息的路由和存儲由 Exchange 和 Binding 決定。
代碼示例:
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange, String routingKey, String message) { rabbitTemplate.convertAndSend(exchange, routingKey, message); System.out.println("Sent message: " + message); } }
調(diào)用示例:
producer.sendMessage("direct-exchange", "key1", "Hello RabbitMQ");
direct-exchange
:目標(biāo)交換機。key1
:消息的路由鍵。
Exchange(交換機)
作用:接收來自 Producer 的消息,并根據(jù) Routing Key 和 Binding 的配置,決定將消息發(fā)送到哪些隊列。
Exchange 通常需要手動注冊為 Bean。
- RabbitMQ 的 Exchange 是通過名稱來標(biāo)識的。
- 在 Spring Boot 中,您通過
@Bean
方法注冊 Exchange 時,實際上是將 Exchange 的名稱和類型綁定到 RabbitMQ 服務(wù)器。- 發(fā)送消息時,RabbitMQ 客戶端會根據(jù) Exchange 的名稱找到對應(yīng)的 Exchange,并根據(jù) Routing Key 將消息路由到隊列。
類型:
- Direct Exchange:精確匹配 Routing Key。消息的 Routing Key 必須與 Binding 的 Routing Key 完全一致。
- Topic Exchange:支持通配符匹配。例如,
with("key.*")
可以匹配key.1
、key.2
等。 - Fanout Exchange:忽略 Routing Key,消息會被廣播到所有綁定的隊列。
- Headers Exchange:忽略 Routing Key,根據(jù)消息頭屬性匹配。
代碼示例(定義交換機):
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.core.HeadersExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ExchangeConfig { @Bean public DirectExchange directExchange() { return new DirectExchange("direct-exchange"); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout-exchange"); } @Bean public TopicExchange topicExchange() { return new TopicExchange("topic-exchange"); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers-exchange"); } }
Queue(隊列)
作用:消息的存儲容器,等待消費者從中取出消息進行處理。
Queue 也需要手動注冊為 Bean。Spring Boot 不會自動注冊隊列,因為隊列的名稱和屬性(如是否持久化、是否排他等)需要根據(jù)業(yè)務(wù)需求進行配置。
關(guān)鍵點:
- 消息會保存在隊列中,直到被消費。
- 隊列可以是持久化的(重啟 RabbitMQ 后消息仍然存在)或非持久化的。
代碼示例(定義隊列):
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class QueueConfig { @Bean public Queue demoQueue() { return new Queue("demo-queue", true); // 持久化隊列 } }
Routing Key(路由鍵)
作用:決定消息如何從交換機路由到隊列。
關(guān)鍵點:
- Routing Key 由 Producer 指定。
- 在 Direct 和 Topic 類型的 Exchange 中,Routing Key 決定隊列是否接收消息。
Binding(綁定)
- 作用:將隊列與交換機連接,并定義路由規(guī)則。
- 關(guān)鍵點:
- Binding 定義了隊列接受消息的條件。
- 結(jié)合 Routing Key 和交換機類型,共同決定消息的路由方式。
代碼示例(定義綁定):
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.DirectExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class BindingConfig { @Bean public Binding binding(Queue demoQueue, DirectExchange directExchange) { return BindingBuilder.bind(demoQueue).to(directExchange).with("key1"); } }
with("key1")
的作用是 指定 Binding 的 Routing Key。它的含義是:
- 當(dāng)消息發(fā)送到 Exchange 時,Exchange 會根據(jù)消息的 Routing Key 和 Binding 的 Routing Key 進行匹配。
- 如果匹配成功,消息會被路由到對應(yīng)的隊列;如果匹配失敗,消息會被丟棄或進入死信隊列(如果有配置)。
Consumer(消費者)
作用:從隊列中接收并處理消息。
關(guān)鍵點:
- 消費者與隊列直接關(guān)聯(lián)。
- 多個消費者可以監(jiān)聽同一隊列,實現(xiàn)負(fù)載均衡。
代碼示例:
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class Consumer { @RabbitListener(queues = "demo-queue") public void receiveMessage(String message) { System.out.println("Received message: " + message); } }
1.2 RabbitMQ 消息傳輸模型
點對點模型
定義:消息從生產(chǎn)者發(fā)送到隊列,由消費者從隊列中接收,消息只能被一個消費者消費。
實現(xiàn):
- 使用默認(rèn)交換機(空字符串
""
)。 - 直接將消息發(fā)送到隊列。
代碼示例:
rabbitTemplate.convertAndSend("", "demo-queue", "Point-to-Point Message");
發(fā)布訂閱模型
定義:生產(chǎn)者將消息發(fā)送到 Fanout 類型的交換機,消息會廣播到所有綁定的隊列。
實現(xiàn):
- 不需要 Routing Key。
- 所有綁定到 Fanout 交換機的隊列都會接收消息。
代碼示例:
rabbitTemplate.convertAndSend("fanout-exchange", "", "Fanout Message");
路由模型
定義:生產(chǎn)者將消息發(fā)送到 Direct 類型的交換機,根據(jù) Routing Key 精確匹配隊列。
實現(xiàn):
- 隊列通過 Binding 綁定到交換機時,指定 Routing Key。
- 消息的 Routing Key 必須與 Binding 的 Routing Key 一致。
代碼示例:
rabbitTemplate.convertAndSend("direct-exchange", "key1", "Routing Message");
2. 環(huán)境準(zhǔn)備
2.1 安裝與配置 RabbitMQ
下載 Docker
訪問 Docker 官方網(wǎng)站:Docker: Accelerated Container Application Development。
根據(jù)您的操作系統(tǒng)(Windows、macOS 或 Linux)下載并安裝 Docker Desktop。
啟動 Docker
- 安裝完成后,啟動 Docker Desktop。
- 確保 Docker 正在運行(任務(wù)欄或菜單欄中可以看到 Docker 圖標(biāo))。
使用 Docker 快速部署 RabbitMQ
Docker 是部署 RabbitMQ 的最簡單方式。通過以下命令,您可以快速啟動一個 RabbitMQ 容器:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
參數(shù)說明:
-d
:以后臺模式運行容器。--name rabbitmq
:為容器指定名稱(rabbitmq
)。-p 5672:5672
:將容器的 5672 端口映射到主機的 5672 端口(RabbitMQ 的消息通信端口)。-p 15672:15672
:將容器的 15672 端口映射到主機的 15672 端口(RabbitMQ 管理插件的 Web 界面端口)。rabbitmq:management
:使用帶有管理插件的 RabbitMQ 鏡像。
驗證 RabbitMQ 是否運行
運行以下命令,查看容器是否正常運行:
docker ps
如果看到 rabbitmq
容器正在運行,說明 RabbitMQ 已成功啟動。
2.2 使用 RabbitMQ 管理插件
RabbitMQ 提供了一個 Web 管理界面,方便您監(jiān)控和管理 RabbitMQ。
訪問管理界面
- 打開瀏覽器,訪問
http://localhost:15672
。 - 使用默認(rèn)用戶名和密碼登錄:
- 用戶名:
guest
- 密碼:
guest
- 用戶名:
管理界面功能
- Overview:查看 RabbitMQ 的整體狀態(tài),如連接數(shù)、隊列數(shù)、消息速率等。
- Connections:查看當(dāng)前連接到 RabbitMQ 的客戶端。
- Channels:查看當(dāng)前打開的通道。
- Exchanges:查看和管理 Exchange。
- Queues:查看和管理 Queue。
- Admin:管理用戶和權(quán)限。
2.3 用戶與權(quán)限配置
默認(rèn)情況下,RabbitMQ 只有一個用戶 guest
,密碼也是 guest
。為了安全性和權(quán)限管理,建議創(chuàng)建新用戶并分配權(quán)限。
1. 創(chuàng)建新用戶
- 在 RabbitMQ 管理界面中:
- 點擊頂部導(dǎo)航欄的 Admin。
- 在用戶列表下方,點擊 Add a user。
- 輸入用戶名和密碼,例如:
- 用戶名:
admin
- 密碼:
admin123
- 用戶名:
- 點擊 Add user 完成創(chuàng)建。
2. 分配權(quán)限
- 在用戶列表中,找到剛創(chuàng)建的用戶(如
admin
)。 - 點擊用戶右側(cè)的 Set permission。
- 在權(quán)限設(shè)置頁面:
- Virtual Host:選擇
/
(默認(rèn)的虛擬主機)。 - Configure:輸入
.*
,表示允許用戶配置所有資源。 - Write:輸入
.*
,表示允許用戶寫入所有資源。 - Read:輸入
.*
,表示允許用戶讀取所有資源。
- Virtual Host:選擇
- 點擊 Set permission 完成權(quán)限分配。
3. 使用新用戶登錄
- 退出當(dāng)前用戶(點擊右上角的
guest
,選擇 Log out)。 - 使用新用戶(如
admin
)登錄。
2.4 Spring Boot 中引入 RabbitMQ 依賴
在 pom.xml
中添加以下依賴:
<dependencies> <!-- RabbitMQ 依賴 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
spring-boot-starter-amqp
是 Spring Boot 提供的 RabbitMQ 集成依賴,它包含了以下內(nèi)容:
RabbitMQ 客戶端庫:
- 自動引入 RabbitMQ 的 Java 客戶端庫(
amqp-client
),用于與 RabbitMQ 服務(wù)器通信。
Spring AMQP 支持:
- 提供了 Spring 對 AMQP(Advanced Message Queuing Protocol)的支持,包括
RabbitTemplate
、@RabbitListener
等。
2.5 Spring Boot 配置 RabbitMQ
在 Spring Boot 項目中,您需要在 application.properties
或 application.yml
中配置 RabbitMQ 的連接信息。
示例配置
# RabbitMQ 連接配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin123
配置說明:
spring.rabbitmq.host
:RabbitMQ 服務(wù)器地址(默認(rèn)localhost
)。spring.rabbitmq.port
:RabbitMQ 消息通信端口(默認(rèn)5672
)。spring.rabbitmq.username
:RabbitMQ 用戶名。spring.rabbitmq.password
:RabbitMQ 密碼。
3. Spring Boot 集成 RabbitMQ 的消息生產(chǎn)和消費
3.1 消息生產(chǎn)者(Producer)
- 在 Spring Boot 中,我們使用
RabbitTemplate
來發(fā)送消息。它由spring-boot-starter-amqp
自動配置成為一個 Bean,可直接通過@Autowired
注入。 - 如果 message 不是 String 類型的處理 Spring AMQP(
spring-boot-starter-amqp
)在使用RabbitTemplate
時,默認(rèn)的消息轉(zhuǎn)換器(MessageConverter
)通常會將對象序列化為 JSON 或者將字符串消息轉(zhuǎn)換為字節(jié)。 - 如果你的業(yè)務(wù)數(shù)據(jù)不是
String
,常見做法是:- 在發(fā)送時把非字符串對象序列化(如轉(zhuǎn)換為 JSON 字符串);
- 或者配置自定義的
MessageConverter
,讓 Spring 幫你把對象自動序列化/反序列化。
典型做法:手動序列化為 JSON 再發(fā)送
@Service public class CustomObjectProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendCustomObject(String queueName, MyCustomObject obj) { // 1. 將自定義對象序列化為 JSON 字符串 String jsonString = new Gson().toJson(obj); // 2. 發(fā)送 JSON 字符串到 RabbitMQ rabbitTemplate.convertAndSend(queueName, jsonString); } }
在消費者端,你也可以將消息(JSON 字符串)反序列化為 MyCustomObject
。
配置自定義 Converter(可選)
Spring AMQP 提供了 Jackson2JsonMessageConverter
等現(xiàn)成轉(zhuǎn)換器。
@Configuration public class RabbitConfig { @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // 配置 RabbitTemplate 使用該轉(zhuǎn)換器 @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(jsonMessageConverter()); return template; } }
這樣一來,rabbitTemplate.convertAndSend(queueName, myObject)
會自動把 myObject
轉(zhuǎn)成 JSON 發(fā)送;消費者端則自動解析為同樣的 Java 對象。 1)基本消息發(fā)送
場景:
將消息直接發(fā)送到指定的隊列,跳過交換機的路由,讓 RabbitMQ 把消息放到這個隊列中。
核心代碼示例:
@Service public class BasicProducer { @Autowired private RabbitTemplate rabbitTemplate; // 1.自動注入的 RabbitTemplate /** * 2.發(fā)送基本消息到指定的隊列 * @param queueName 目標(biāo)隊列名稱 * @param message 消息內(nèi)容 */ public void sendToQueue(String queueName, String message) { // 3.調(diào)用 convertAndSend,直接將消息放入指定隊列 rabbitTemplate.convertAndSend(queueName, message); System.out.println("Message sent to queue: " + queueName + ", content: " + message); } }
代碼詳解:
@Autowired
private RabbitTemplate rabbitTemplate;`- Spring Boot 自動為我們配置了
RabbitTemplate
,不用手動定義 Bean。 - 通過依賴注入即可使用所有與 RabbitMQ 交互的方法。
- Spring Boot 自動為我們配置了
public void sendToQueue(String queueName, String message)
- 方法參數(shù)包括:
queueName
: 目標(biāo)隊列的名稱。message
: 要發(fā)送的字符串類型消息內(nèi)容。
rabbitTemplate.convertAndSend(queueName, message)
convertAndSend
方法會將消息轉(zhuǎn)換(轉(zhuǎn)換為字節(jié))并發(fā)送到指定隊列。- 如果該隊列不存在,RabbitMQ 會嘗試自動創(chuàng)建(前提是 Broker 端配置允許自動創(chuàng)建隊列)。
2)發(fā)送到交換機
場景:
將消息發(fā)送到一個交換機(Exchange),再由交換機通過 Routing Key
將消息路由到匹配的隊列中。
核心代碼示例:
@Service public class ExchangeProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * 發(fā)送消息到指定交換機 * @param exchangeName 交換機名稱 * @param routingKey 路由鍵 * @param message 消息內(nèi)容 */ public void sendToExchange(String exchangeName, String routingKey, String message) { // 將消息發(fā)送到 exchangeName 指定的交換機,使用 routingKey 進行路由 rabbitTemplate.convertAndSend(exchangeName, routingKey, message); System.out.println("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey); } }
代碼詳解:
exchangeName
- 要發(fā)送到的交換機名稱,例如
"direct-exchange"
、"fanout-exchange"
等。
- 要發(fā)送到的交換機名稱,例如
routingKey
- 路由鍵,用來匹配綁定(Binding)。例如:對
DirectExchange
而言,需要隊列綁定時的路由鍵與發(fā)送時的路由鍵相同,消息才能到達隊列。 rabbitTemplate.convertAndSend(exchangeName, routingKey, message)
- 路由鍵,用來匹配綁定(Binding)。例如:對
- 將消息先發(fā)送到交換機,再根據(jù)路由鍵將消息投遞到目標(biāo)隊列。
3)發(fā)送帶消息屬性的消息
場景:
需要為消息設(shè)置 TTL(過期時間)或優(yōu)先級等屬性,控制消息在隊列中的行為。
核心代碼示例:
@Service public class PropertyProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * 發(fā)送帶消息屬性的消息(如 TTL, 優(yōu)先級) */ public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) { // 1.創(chuàng)建 MessageProperties 對象,用于指定消息的屬性 MessageProperties properties = new MessageProperties(); properties.setExpiration("10000"); // 過期時間:10秒 (單位:毫秒) properties.setPriority(5); // 優(yōu)先級設(shè)為 5 // 2.根據(jù)消息體和屬性構(gòu)建 Message 對象 Message message = new Message(messageContent.getBytes(), properties); // 3.使用 send 方法(而非 convertAndSend)直接發(fā)送 Message 對象 rabbitTemplate.send(exchange, routingKey, message); System.out.println("Message with properties sent: " + messageContent); } }
代碼詳解:
MessageProperties properties = new MessageProperties();
MessageProperties
用于設(shè)置 AMQP 協(xié)議層的各種消息頭信息。
properties.setExpiration("10000");
setExpiration
設(shè)置消息的 TTL(Time-To-Live),單位是毫秒。如果到達時間后消息仍未被消費,RabbitMQ 會將其從隊列中移除并送入死信隊列(如果配置了死信隊列)。properties.setPriority(5);
- 設(shè)置消息的優(yōu)先級為 5,前提是隊列本身需要支持優(yōu)先級隊列(創(chuàng)建隊列時指定
x-max-priority
)。new Message(messageContent.getBytes(), properties)
- 將純文本消息轉(zhuǎn)換為
Message
對象,結(jié)合了消息屬性和消息體。rabbitTemplate.send(exchange, routingKey, message);
- 與
convertAndSend
不同,它不會嘗試進行消息轉(zhuǎn)換(如 JSON、字符串),而是直接發(fā)送完整的 AMQPMessage
對象。
Message
構(gòu)造函數(shù)
public Message(byte[] body, MessageProperties messageProperties) { this.body = body; this.messageProperties = messageProperties; }
body
:消息體的字節(jié)數(shù)組。
messageProperties
:AMQP 的消息屬性,包括 TTL、優(yōu)先級、headers 等。、
如果消息體不是String類型
手動轉(zhuǎn)換為字節(jié):你可以先將自定義對象轉(zhuǎn)換為字節(jié)數(shù)組(例如通過 JSON 序列化或 Java 序列化),再放入 new Message(...)
的第一個參數(shù)。
MyCustomObject obj = new MyCustomObject(); // 假設(shè)你想用 JSON String jsonString = new Gson().toJson(obj); byte[] body = jsonString.getBytes(StandardCharsets.UTF_8); MessageProperties properties = new MessageProperties(); // 設(shè)置一些屬性 Message message = new Message(body, properties);
- 為什么不會自動轉(zhuǎn) JSON?使用
new Message(...)
構(gòu)造方法是“純” AMQP 層的做法,不會調(diào)用 Spring 的轉(zhuǎn)換器,因此你必須自己處理序列化。 - 使用
Message
構(gòu)造函數(shù) 時,你必須自行處理對象到byte[]
的轉(zhuǎn)換(無論是字符串、JSON,還是其他格式)。 - 如果想讓 Spring AMQP 自動轉(zhuǎn)換,你通常使用
rabbitTemplate.convertAndSend(Object msg)
這種高級 API,或者配置自定義MessageConverter
。
3.2 消息消費者(Consumer)
消費者的核心功能是在指定的隊列中監(jiān)聽消息,并根據(jù)配置的確認(rèn)模式(自動確認(rèn)或手動確認(rèn))對消息進行處理或拒絕。
1)監(jiān)聽隊列并消費消息
核心代碼示例(自動確認(rèn)模式):
@Service public class Consumer { /** * 使用注解 @RabbitListener 指定要監(jiān)聽的隊列 * 由于默認(rèn)為 auto-ack 模式, * 當(dāng)消息到達后,RabbitMQ 會自動確認(rèn)并從隊列中刪除該消息。 */ @RabbitListener(queues = "demo-queue") public void receiveMessage(String message) { // 1.從 queueName 隊列中取到的消息內(nèi)容 System.out.println("Received message: " + message); // 2.在 auto-ack 模式下,無需手動 ack // 如果這里出現(xiàn)異常,RabbitMQ 不會再次發(fā)送消息給消費者,消息會丟失。 } }
代碼詳解(自動確認(rèn)模式):
@RabbitListener(queues = "demo-queue")
- 聲明監(jiān)聽名為
demo-queue
的隊列。 - 一旦有新消息到達該隊列,就會自動回調(diào)此方法。
- 聲明監(jiān)聽名為
public void receiveMessage(String message)
- 默認(rèn)參數(shù)類型為字符串,當(dāng) RabbitMQ 收到消息后會嘗試將其轉(zhuǎn)換為
String
并注入到message
中。
- 默認(rèn)參數(shù)類型為字符串,當(dāng) RabbitMQ 收到消息后會嘗試將其轉(zhuǎn)換為
- 自動確認(rèn)(auto-ack)的風(fēng)險
- 如果消費者在處理消息時拋出異常,消息已經(jīng)被 RabbitMQ 標(biāo)記為“已確認(rèn)”,不會再重新發(fā)送或進入死信隊列,導(dǎo)致消息丟失。
2)確認(rèn)機制
自動確認(rèn)(auto-ack)
- 行為:
- 當(dāng)消費者從隊列中獲取消息后,RabbitMQ 會立即將該消息標(biāo)記為已確認(rèn)(acknowledged),并從隊列中刪除。
- 問題:
- 如果消息處理失?。ɡ缦M者拋出異常),消息已經(jīng)被確認(rèn)并從隊列中刪除,無法重新處理。
- 如果消費者崩潰或斷開連接,未處理的消息會丟失。
- 適用場景:
- 對消息處理的可靠性要求不高的場景。
手動確認(rèn)(manual-ack)
- 行為:
- 消費者處理完消息后,必須顯式調(diào)用
basicAck
方法確認(rèn)消息。 - 如果消息處理失敗,可以調(diào)用
basicNack
或basicReject
方法拒絕消息。
- 消費者處理完消息后,必須顯式調(diào)用
- 優(yōu)點:
- 確保消息處理的可靠性。
- 支持消息重新入隊或發(fā)送到死信隊列。
- 適用場景:
- 對消息處理的可靠性要求較高的場景。
核心代碼示例:
@Service public class ManualAckConsumer { /** * 在 application.properties 中配置: * spring.rabbitmq.listener.simple.acknowledge-mode=manual * 使得 RabbitMQ 使用手動確認(rèn)模式 */ @RabbitListener(queues = "demo-queue") public void receiveMessage(Message message, Channel channel) throws IOException { try { // 1.從消息中獲取消息體 String body = new String(message.getBody()); System.out.println("Processing message: " + body); // 2.如果業(yè)務(wù)處理成功,則調(diào)用 basicAck 手動確認(rèn) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { System.err.println("Message processing failed: " + e.getMessage()); // 3.如果處理失敗,需要決定是重新入隊還是拒絕并進入死信隊列 // requeue = true -> 重新入隊 // requeue = false -> 丟棄或進入死信隊列(根據(jù)隊列配置) channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }
代碼詳解:
配置手動確認(rèn):
在 application.properties
添加
spring.rabbitmq.listener.simple.acknowledge-mode=manual
表示 Spring AMQP 使用手動確認(rèn)模式(manual-ack)。
public void receiveMessage(Message message, Channel channel)
:
與自動確認(rèn)不同,這里不僅接收字符串,還接收了 org.springframework.amqp.core.Message
對象和 com.rabbitmq.client.Channel
。Message
:包含消息體(body)和消息屬性(headers 等)。Channel
:給我們提供了 basicAck
, basicNack
, basicReject
等底層 AMQP 操作。
手動確認(rèn)成功:
channel.basicAck(deliveryTag, multiple)
:
deliveryTag
:
本次消息的唯一標(biāo)記,從 message.getMessageProperties().getDeliveryTag()
獲取。
multiple = false
:只確認(rèn)當(dāng)前這條消息。
這里的
basicAck(long deliveryTag, boolean multiple)
deliveryTag
并不是在你構(gòu)造Message
時生成的,而是 RabbitMQ Broker 在投遞消息給消費者時由底層 AMQP 協(xié)議自動分配的一個遞增的序號。
long deliveryTag = message.getMessageProperties().getDeliveryTag();
手動確認(rèn)失敗:
channel.basicNack(deliveryTag, multiple, requeue)
或basicReject
:requeue = true
:將消息重新放回隊列等待下一次消費(可能導(dǎo)致死循環(huán),如處理一直失?。?/li>requeue = false
:拒絕消息,若配置了死信隊列,則進入死信隊列;否則丟棄消息。
3)處理消費失敗
- 自動確認(rèn)模式下的處理
- 在自動確認(rèn)模式下,如果消息處理失敗,RabbitMQ 不會重新發(fā)送消息,因為消息已經(jīng)被確認(rèn)并從隊列中刪除。
- 問題:
- 消息丟失,無法重新處理。
- 手動確認(rèn)模式下的處理
- 在手動確認(rèn)模式下,如果消息處理失敗,可以通過以下方式處理:
- 重新入隊:
- 調(diào)用
basicNack
或basicReject
方法,并將requeue
參數(shù)設(shè)置為true
。 - 消息會重新進入隊列,等待下一次消費。
- 發(fā)送到死信隊列:
- 調(diào)用
basicNack
或basicReject
方法,并將requeue
參數(shù)設(shè)置為false
。 - 如果隊列配置了死信隊列,消息會被發(fā)送到死信隊列。
- 重試機制(Spring AMQP 提供的簡單重試)(只支持手動確認(rèn)機制)
是重試失敗了才會將消息重新入隊 ,所以重試在前,重新入隊在后
# 啟用重試 spring.rabbitmq.listener.simple.retry.enabled=true # 最大重試次數(shù) spring.rabbitmq.listener.simple.retry.max-attempts=3 # 初始重試間隔 spring.rabbitmq.listener.simple.retry.initial-interval=1000 # 間隔倍數(shù) spring.rabbitmq.listener.simple.retry.multiplier=2.0 # 最大重試間隔 spring.rabbitmq.listener.simple.retry.max-interval=10000
- Spring AMQP 提供了 重試機制,可以在消費者處理消息失敗時,自動進行多次重試,而不是直接將消息重新入隊。
行為
- 當(dāng)消息處理失敗時,Spring AMQP 會在 本地 進行重試(即不將消息重新入隊),直到達到最大重試次數(shù)。
- 如果重試次數(shù)用盡,消息會被拒絕(
basicNack
或basicReject
),并根據(jù)配置決定是否重新入隊或發(fā)送到死信隊列。
死信隊列(DLQ)
- 當(dāng)消息被拒絕或過期時,RabbitMQ 會將其發(fā)送到我們配置的死信交換機(DLX),再路由到死信隊列(DLQ)。
- 配置示例:
@Configuration public class RabbitConfig { @Bean public Queue normalQueue() { return QueueBuilder.durable("normal-queue") .withArgument("x-dead-letter-exchange", "dead-letter-exchange") // 指定死信交換機 .withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // 指定死信路由鍵 .build(); } @Bean public DirectExchange deadLetterExchange() { return new DirectExchange("dead-letter-exchange"); } @Bean public Queue deadLetterQueue() { return new Queue("dead-letter-queue"); } @Bean public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key"); } }
原理:
- 正常隊列通過
x-dead-letter-exchange
指定死信交換機,一旦消息被拒絕(requeue=false
)或超時(TTL 到期),RabbitMQ 會把消息發(fā)送到dead-letter-exchange
。 dead-letter-exchange
與dead-letter-queue
進行綁定(路由鍵dead-letter-routing-key
),從而實現(xiàn)死信隊列的存儲。
重新入隊 vs 發(fā)送到死信隊列
- 重新入隊:
channel.basicNack(deliveryTag, false, true)
- 適用于臨時性錯誤,比如數(shù)據(jù)庫鎖沖突、網(wǎng)絡(luò)抖動等,等待后續(xù)重新處理。
- 發(fā)送到死信隊列:
channel.basicNack(deliveryTag, false, false)
- 適用于永久性錯誤,比如消息格式無法解析,或業(yè)務(wù)邏輯指定不應(yīng)再嘗試。
到此這篇關(guān)于Spring boot框架下的RabbitMQ消息中間件的文章就介紹到這了,更多相關(guān)Spring boot RabbitMQ消息中間件內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于Java Springboot + Vue + MyBatis實現(xiàn)音樂播放系統(tǒng)
這篇文章主要介紹了一個完整的音樂播放系統(tǒng)是基于Java Springboot + Vue + MyBatis編寫的,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-08-08Springboot解決no main manifest attribute錯誤
在開發(fā)Springboot項目時,使用java -jar命令運行jar包可能出現(xiàn)no main manifest attribute錯誤,本文就來介紹一下該錯誤的解決方法,感興趣的可以了解一下2024-09-09一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J
本文主要介紹了一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03使用Java判定一個數(shù)值是否在指定的開閉區(qū)間范圍內(nèi)
這篇文章主要給大家介紹了關(guān)于使用Java判定一個數(shù)值是否在指定的開閉區(qū)間范圍內(nèi)的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Java具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2022-09-09SpringBoot集成Redis實現(xiàn)驗證碼的簡單案例
本文主要介紹了SpringBoot集成Redis實現(xiàn)驗證碼的簡單案例,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08