亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Spring boot框架下的RabbitMQ消息中間件詳解

 更新時間:2025年01月22日 10:46:14   作者:阿乾之銘  
這篇文章詳細(xì)介紹了Spring Boot框架下的RabbitMQ消息中間件的基本概念、消息傳輸模型、環(huán)境準(zhǔn)備、Spring Boot集成以及消息生產(chǎn)和消費,感興趣的朋友跟隨小編一起看看吧

1. RabbitMQ 基礎(chǔ)概念

1.1 消息處理流程與組件配合

Producer(生產(chǎn)者) 發(fā)送消息。消息先發(fā)送到 Exchange(交換機),而不是直接到隊列。

  • Exchange(交換機) 接收到消息后,根據(jù) Routing Key(路由鍵) 和 Binding(綁定規(guī)則),決定將消息發(fā)送到哪些 Queue(隊列)。
  • Queue(隊列) 存儲消息,等待 Consumer(消費者) 消費。
  • Consumer(消費者) 從隊列中接收并處理消息。

Producer(生產(chǎn)者)

作用:負(fù)責(zé)發(fā)送消息到 RabbitMQ 的入口,指定消息的 Exchange 和 Routing Key。

關(guān)鍵點

  • Producer 只需要知道 Exchange 和 Routing Key,不關(guān)心隊列。
  • Producer 不直接與隊列交互,消息的路由和存儲由 Exchange 和 Binding 決定。

代碼示例

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String exchange, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        System.out.println("Sent message: " + message);
    }
}

調(diào)用示例

producer.sendMessage("direct-exchange", "key1", "Hello RabbitMQ");
  • direct-exchange:目標(biāo)交換機。
  • key1:消息的路由鍵。

Exchange(交換機)

作用:接收來自 Producer 的消息,并根據(jù) Routing Key 和 Binding 的配置,決定將消息發(fā)送到哪些隊列。

Exchange 通常需要手動注冊為 Bean。

  • RabbitMQ 的 Exchange 是通過名稱來標(biāo)識的。
  • 在 Spring Boot 中,您通過 @Bean 方法注冊 Exchange 時,實際上是將 Exchange 的名稱和類型綁定到 RabbitMQ 服務(wù)器。
  • 發(fā)送消息時,RabbitMQ 客戶端會根據(jù) Exchange 的名稱找到對應(yīng)的 Exchange,并根據(jù) Routing Key 將消息路由到隊列。

類型

  • Direct Exchange:精確匹配 Routing Key。消息的 Routing Key 必須與 Binding 的 Routing Key 完全一致。
  • Topic Exchange:支持通配符匹配。例如,with("key.*") 可以匹配 key.1、key.2 等。
  • Fanout Exchange:忽略 Routing Key,消息會被廣播到所有綁定的隊列。
  • Headers Exchange:忽略 Routing Key,根據(jù)消息頭屬性匹配。

代碼示例(定義交換機):

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ExchangeConfig {
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct-exchange");
    }
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout-exchange");
    }
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic-exchange");
    }
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers-exchange");
    }
}

Queue(隊列)

作用:消息的存儲容器,等待消費者從中取出消息進行處理。

Queue 也需要手動注冊為 Bean。Spring Boot 不會自動注冊隊列,因為隊列的名稱和屬性(如是否持久化、是否排他等)需要根據(jù)業(yè)務(wù)需求進行配置。

關(guān)鍵點

  • 消息會保存在隊列中,直到被消費。
  • 隊列可以是持久化的(重啟 RabbitMQ 后消息仍然存在)或非持久化的。

代碼示例(定義隊列):

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
    @Bean
    public Queue demoQueue() {
        return new Queue("demo-queue", true); // 持久化隊列
    }
}

Routing Key(路由鍵)

作用:決定消息如何從交換機路由到隊列。

關(guān)鍵點

  • Routing Key 由 Producer 指定。
  • 在 Direct 和 Topic 類型的 Exchange 中,Routing Key 決定隊列是否接收消息。

Binding(綁定)

  • 作用:將隊列與交換機連接,并定義路由規(guī)則。
  • 關(guān)鍵點
    • Binding 定義了隊列接受消息的條件。
    • 結(jié)合 Routing Key 和交換機類型,共同決定消息的路由方式。

代碼示例(定義綁定):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BindingConfig {
    @Bean
    public Binding binding(Queue demoQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(demoQueue).to(directExchange).with("key1");
    }
}

with("key1") 的作用是 指定 Binding 的 Routing Key。它的含義是:

  • 當(dāng)消息發(fā)送到 Exchange 時,Exchange 會根據(jù)消息的 Routing Key 和 Binding 的 Routing Key 進行匹配。
  • 如果匹配成功,消息會被路由到對應(yīng)的隊列;如果匹配失敗,消息會被丟棄或進入死信隊列(如果有配置)。

Consumer(消費者)

作用:從隊列中接收并處理消息。

關(guān)鍵點

  • 消費者與隊列直接關(guān)聯(lián)。
  • 多個消費者可以監(jiān)聽同一隊列,實現(xiàn)負(fù)載均衡。

代碼示例

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

1.2 RabbitMQ 消息傳輸模型

點對點模型

定義:消息從生產(chǎn)者發(fā)送到隊列,由消費者從隊列中接收,消息只能被一個消費者消費。

實現(xiàn)

  • 使用默認(rèn)交換機(空字符串 "")。
  • 直接將消息發(fā)送到隊列。

代碼示例

rabbitTemplate.convertAndSend("", "demo-queue", "Point-to-Point Message");

發(fā)布訂閱模型

定義:生產(chǎn)者將消息發(fā)送到 Fanout 類型的交換機,消息會廣播到所有綁定的隊列。

實現(xiàn)

  • 不需要 Routing Key。
  • 所有綁定到 Fanout 交換機的隊列都會接收消息。

代碼示例

rabbitTemplate.convertAndSend("fanout-exchange", "", "Fanout Message");

路由模型

定義:生產(chǎn)者將消息發(fā)送到 Direct 類型的交換機,根據(jù) Routing Key 精確匹配隊列。

實現(xiàn)

  • 隊列通過 Binding 綁定到交換機時,指定 Routing Key。
  • 消息的 Routing Key 必須與 Binding 的 Routing Key 一致。

代碼示例

rabbitTemplate.convertAndSend("direct-exchange", "key1", "Routing Message");

2. 環(huán)境準(zhǔn)備

2.1 安裝與配置 RabbitMQ

下載 Docker

啟動 Docker

  • 安裝完成后,啟動 Docker Desktop。
  • 確保 Docker 正在運行(任務(wù)欄或菜單欄中可以看到 Docker 圖標(biāo))。

使用 Docker 快速部署 RabbitMQ

Docker 是部署 RabbitMQ 的最簡單方式。通過以下命令,您可以快速啟動一個 RabbitMQ 容器:

  • docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

參數(shù)說明

  • -d:以后臺模式運行容器。
  • --name rabbitmq:為容器指定名稱(rabbitmq)。
  • -p 5672:5672:將容器的 5672 端口映射到主機的 5672 端口(RabbitMQ 的消息通信端口)。
  • -p 15672:15672:將容器的 15672 端口映射到主機的 15672 端口(RabbitMQ 管理插件的 Web 界面端口)。
  • rabbitmq:management:使用帶有管理插件的 RabbitMQ 鏡像。

驗證 RabbitMQ 是否運行

運行以下命令,查看容器是否正常運行:

docker ps

如果看到 rabbitmq 容器正在運行,說明 RabbitMQ 已成功啟動。

2.2 使用 RabbitMQ 管理插件

RabbitMQ 提供了一個 Web 管理界面,方便您監(jiān)控和管理 RabbitMQ。

訪問管理界面

  • 打開瀏覽器,訪問 http://localhost:15672
  • 使用默認(rèn)用戶名和密碼登錄:
    • 用戶名:guest
    • 密碼:guest

管理界面功能

  • Overview:查看 RabbitMQ 的整體狀態(tài),如連接數(shù)、隊列數(shù)、消息速率等。
  • Connections:查看當(dāng)前連接到 RabbitMQ 的客戶端。
  • Channels:查看當(dāng)前打開的通道。
  • Exchanges:查看和管理 Exchange。
  • Queues:查看和管理 Queue。
  • Admin:管理用戶和權(quán)限。

2.3 用戶與權(quán)限配置

默認(rèn)情況下,RabbitMQ 只有一個用戶 guest,密碼也是 guest。為了安全性和權(quán)限管理,建議創(chuàng)建新用戶并分配權(quán)限。

1. 創(chuàng)建新用戶

  • 在 RabbitMQ 管理界面中:
  • 點擊頂部導(dǎo)航欄的 Admin
  • 在用戶列表下方,點擊 Add a user。
  • 輸入用戶名和密碼,例如:
    • 用戶名:admin
    • 密碼:admin123
  • 點擊 Add user 完成創(chuàng)建。

2. 分配權(quán)限

  • 在用戶列表中,找到剛創(chuàng)建的用戶(如 admin)。
  • 點擊用戶右側(cè)的 Set permission。
  • 在權(quán)限設(shè)置頁面:
    • Virtual Host:選擇 /(默認(rèn)的虛擬主機)。
    • Configure:輸入 .*,表示允許用戶配置所有資源。
    • Write:輸入 .*,表示允許用戶寫入所有資源。
    • Read:輸入 .*,表示允許用戶讀取所有資源。
  • 點擊 Set permission 完成權(quán)限分配。

3. 使用新用戶登錄

  • 退出當(dāng)前用戶(點擊右上角的 guest,選擇 Log out)。
  • 使用新用戶(如 admin)登錄。

2.4  Spring Boot 中引入 RabbitMQ 依賴 

在 pom.xml 中添加以下依賴:

<dependencies>
    <!-- RabbitMQ 依賴 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

spring-boot-starter-amqp 是 Spring Boot 提供的 RabbitMQ 集成依賴,它包含了以下內(nèi)容:

RabbitMQ 客戶端庫

  • 自動引入 RabbitMQ 的 Java 客戶端庫(amqp-client),用于與 RabbitMQ 服務(wù)器通信。

Spring AMQP 支持

  • 提供了 Spring 對 AMQP(Advanced Message Queuing Protocol)的支持,包括 RabbitTemplate、@RabbitListener 等。

2.5 Spring Boot 配置 RabbitMQ

在 Spring Boot 項目中,您需要在 application.properties 或 application.yml 中配置 RabbitMQ 的連接信息。

示例配置

  • # RabbitMQ 連接配置
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin123

配置說明

  • spring.rabbitmq.host:RabbitMQ 服務(wù)器地址(默認(rèn) localhost)。
  • spring.rabbitmq.port:RabbitMQ 消息通信端口(默認(rèn) 5672)。
  • spring.rabbitmq.username:RabbitMQ 用戶名。
  • spring.rabbitmq.password:RabbitMQ 密碼。

3. Spring Boot 集成 RabbitMQ 的消息生產(chǎn)和消費

3.1 消息生產(chǎn)者(Producer)

  • 在 Spring Boot 中,我們使用 RabbitTemplate 來發(fā)送消息。它由 spring-boot-starter-amqp 自動配置成為一個 Bean,可直接通過 @Autowired 注入。
  • 如果 message 不是 String 類型的處理 Spring AMQP(spring-boot-starter-amqp)在使用 RabbitTemplate 時,默認(rèn)的消息轉(zhuǎn)換器(MessageConverter)通常會將對象序列化為 JSON 或者將字符串消息轉(zhuǎn)換為字節(jié)。
  • 如果你的業(yè)務(wù)數(shù)據(jù)不是 String,常見做法是:
    • 在發(fā)送時把非字符串對象序列化(如轉(zhuǎn)換為 JSON 字符串);
    • 或者配置自定義的 MessageConverter,讓 Spring 幫你把對象自動序列化/反序列化。

典型做法:手動序列化為 JSON 再發(fā)送

@Service
public class CustomObjectProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendCustomObject(String queueName, MyCustomObject obj) {
        // 1. 將自定義對象序列化為 JSON 字符串
        String jsonString = new Gson().toJson(obj);
        // 2. 發(fā)送 JSON 字符串到 RabbitMQ
        rabbitTemplate.convertAndSend(queueName, jsonString);
    }
}

在消費者端,你也可以將消息(JSON 字符串)反序列化為 MyCustomObject。

配置自定義 Converter(可選)

Spring AMQP 提供了 Jackson2JsonMessageConverter 等現(xiàn)成轉(zhuǎn)換器。

@Configuration
public class RabbitConfig {
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    // 配置 RabbitTemplate 使用該轉(zhuǎn)換器
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }
}

這樣一來,rabbitTemplate.convertAndSend(queueName, myObject) 會自動把 myObject 轉(zhuǎn)成 JSON 發(fā)送;消費者端則自動解析為同樣的 Java 對象。 1)基本消息發(fā)送

場景
將消息直接發(fā)送到指定的隊列,跳過交換機的路由,讓 RabbitMQ 把消息放到這個隊列中。

核心代碼示例

@Service
public class BasicProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;  // 1.自動注入的 RabbitTemplate
    /**
     * 2.發(fā)送基本消息到指定的隊列
     * @param queueName  目標(biāo)隊列名稱
     * @param message    消息內(nèi)容
     */
    public void sendToQueue(String queueName, String message) {
        // 3.調(diào)用 convertAndSend,直接將消息放入指定隊列
        rabbitTemplate.convertAndSend(queueName, message);
        System.out.println("Message sent to queue: " + queueName + ", content: " + message);
    }
}

代碼詳解

  • @Autowired private RabbitTemplate rabbitTemplate;`
    • Spring Boot 自動為我們配置了 RabbitTemplate,不用手動定義 Bean。
    • 通過依賴注入即可使用所有與 RabbitMQ 交互的方法。
  • public void sendToQueue(String queueName, String message)
  • 方法參數(shù)包括:
    • queueName: 目標(biāo)隊列的名稱。
    • message: 要發(fā)送的字符串類型消息內(nèi)容。
  • rabbitTemplate.convertAndSend(queueName, message)
    • convertAndSend 方法會將消息轉(zhuǎn)換(轉(zhuǎn)換為字節(jié))并發(fā)送到指定隊列。
    • 如果該隊列不存在,RabbitMQ 會嘗試自動創(chuàng)建(前提是 Broker 端配置允許自動創(chuàng)建隊列)。

2)發(fā)送到交換機

場景
將消息發(fā)送到一個交換機(Exchange),再由交換機通過 Routing Key 將消息路由到匹配的隊列中。

核心代碼示例

@Service
public class ExchangeProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 發(fā)送消息到指定交換機
     * @param exchangeName  交換機名稱
     * @param routingKey    路由鍵
     * @param message       消息內(nèi)容
     */
    public void sendToExchange(String exchangeName, String routingKey, String message) {
        // 將消息發(fā)送到 exchangeName 指定的交換機,使用 routingKey 進行路由
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
        System.out.println("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey);
    }
}

代碼詳解

  • exchangeName
    • 要發(fā)送到的交換機名稱,例如 "direct-exchange"、"fanout-exchange" 等。
  • routingKey
    • 路由鍵,用來匹配綁定(Binding)。例如:對 DirectExchange 而言,需要隊列綁定時的路由鍵與發(fā)送時的路由鍵相同,消息才能到達隊列。
    • rabbitTemplate.convertAndSend(exchangeName, routingKey, message)
  • 將消息先發(fā)送到交換機,再根據(jù)路由鍵將消息投遞到目標(biāo)隊列。

3)發(fā)送帶消息屬性的消息

場景
需要為消息設(shè)置 TTL(過期時間)或優(yōu)先級等屬性,控制消息在隊列中的行為。

核心代碼示例

@Service
public class PropertyProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 發(fā)送帶消息屬性的消息(如 TTL, 優(yōu)先級)
     */
    public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) {
        // 1.創(chuàng)建 MessageProperties 對象,用于指定消息的屬性
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("10000"); // 過期時間:10秒 (單位:毫秒)
        properties.setPriority(5);        // 優(yōu)先級設(shè)為 5
        // 2.根據(jù)消息體和屬性構(gòu)建 Message 對象
        Message message = new Message(messageContent.getBytes(), properties);
        // 3.使用 send 方法(而非 convertAndSend)直接發(fā)送 Message 對象
        rabbitTemplate.send(exchange, routingKey, message);
        System.out.println("Message with properties sent: " + messageContent);
    }
}

代碼詳解

  • MessageProperties properties = new MessageProperties();
    • MessageProperties 用于設(shè)置 AMQP 協(xié)議層的各種消息頭信息。
  • properties.setExpiration("10000");
    • setExpiration 設(shè)置消息的 TTL(Time-To-Live),單位是毫秒。如果到達時間后消息仍未被消費,RabbitMQ 會將其從隊列中移除并送入死信隊列(如果配置了死信隊列)。
    • properties.setPriority(5);
  • 設(shè)置消息的優(yōu)先級為 5,前提是隊列本身需要支持優(yōu)先級隊列(創(chuàng)建隊列時指定 x-max-priority)。
    • new Message(messageContent.getBytes(), properties)
  • 將純文本消息轉(zhuǎn)換為 Message 對象,結(jié)合了消息屬性和消息體。
    • rabbitTemplate.send(exchange, routingKey, message);
  • convertAndSend 不同,它不會嘗試進行消息轉(zhuǎn)換(如 JSON、字符串),而是直接發(fā)送完整的 AMQP Message 對象。

Message 構(gòu)造函數(shù) 

public Message(byte[] body, MessageProperties messageProperties) {
    this.body = body;
    this.messageProperties = messageProperties;
}
body:消息體的字節(jié)數(shù)組。
messageProperties:AMQP 的消息屬性,包括 TTL、優(yōu)先級、headers 等。、

如果消息體不是String類型

手動轉(zhuǎn)換為字節(jié):你可以先將自定義對象轉(zhuǎn)換為字節(jié)數(shù)組(例如通過 JSON 序列化或 Java 序列化),再放入 new Message(...) 的第一個參數(shù)。
MyCustomObject obj = new MyCustomObject();
// 假設(shè)你想用 JSON
String jsonString = new Gson().toJson(obj);
byte[] body = jsonString.getBytes(StandardCharsets.UTF_8);
MessageProperties properties = new MessageProperties();
// 設(shè)置一些屬性
Message message = new Message(body, properties);
  • 為什么不會自動轉(zhuǎn) JSON?使用 new Message(...) 構(gòu)造方法是“純” AMQP 層的做法,不會調(diào)用 Spring 的轉(zhuǎn)換器,因此你必須自己處理序列化。
  • 使用 Message 構(gòu)造函數(shù) 時,你必須自行處理對象到 byte[] 的轉(zhuǎn)換(無論是字符串、JSON,還是其他格式)。
  • 如果想讓 Spring AMQP 自動轉(zhuǎn)換,你通常使用 rabbitTemplate.convertAndSend(Object msg) 這種高級 API,或者配置自定義 MessageConverter。

3.2 消息消費者(Consumer)

消費者的核心功能是在指定的隊列中監(jiān)聽消息,并根據(jù)配置的確認(rèn)模式(自動確認(rèn)或手動確認(rèn))對消息進行處理或拒絕。

1)監(jiān)聽隊列并消費消息

核心代碼示例(自動確認(rèn)模式)

@Service
public class Consumer {
    /**
     * 使用注解 @RabbitListener 指定要監(jiān)聽的隊列
     * 由于默認(rèn)為 auto-ack 模式,
     * 當(dāng)消息到達后,RabbitMQ 會自動確認(rèn)并從隊列中刪除該消息。
     */
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(String message) {
        // 1.從 queueName 隊列中取到的消息內(nèi)容
        System.out.println("Received message: " + message);
        // 2.在 auto-ack 模式下,無需手動 ack
        //  如果這里出現(xiàn)異常,RabbitMQ 不會再次發(fā)送消息給消費者,消息會丟失。
    }
}

代碼詳解(自動確認(rèn)模式)

  • @RabbitListener(queues = "demo-queue")
    • 聲明監(jiān)聽名為 demo-queue 的隊列。
    • 一旦有新消息到達該隊列,就會自動回調(diào)此方法。
  • public void receiveMessage(String message)
    • 默認(rèn)參數(shù)類型為字符串,當(dāng) RabbitMQ 收到消息后會嘗試將其轉(zhuǎn)換為 String 并注入到 message 中。
  • 自動確認(rèn)(auto-ack)的風(fēng)險
    • 如果消費者在處理消息時拋出異常,消息已經(jīng)被 RabbitMQ 標(biāo)記為“已確認(rèn)”,不會再重新發(fā)送或進入死信隊列,導(dǎo)致消息丟失。

2)確認(rèn)機制

自動確認(rèn)(auto-ack)

  • 行為
    • 當(dāng)消費者從隊列中獲取消息后,RabbitMQ 會立即將該消息標(biāo)記為已確認(rèn)(acknowledged),并從隊列中刪除。
  • 問題
    • 如果消息處理失?。ɡ缦M者拋出異常),消息已經(jīng)被確認(rèn)并從隊列中刪除,無法重新處理。
    • 如果消費者崩潰或斷開連接,未處理的消息會丟失。
  • 適用場景
    • 對消息處理的可靠性要求不高的場景。

手動確認(rèn)(manual-ack)

  • 行為
    • 消費者處理完消息后,必須顯式調(diào)用 basicAck 方法確認(rèn)消息。
    • 如果消息處理失敗,可以調(diào)用 basicNack 或 basicReject 方法拒絕消息。
  • 優(yōu)點
    • 確保消息處理的可靠性。
    • 支持消息重新入隊或發(fā)送到死信隊列。
  • 適用場景
    • 對消息處理的可靠性要求較高的場景。

核心代碼示例:

@Service
public class ManualAckConsumer {
    /**
     * 在 application.properties 中配置:
     * spring.rabbitmq.listener.simple.acknowledge-mode=manual
     * 使得 RabbitMQ 使用手動確認(rèn)模式
     */
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(Message message, Channel channel) throws IOException {
        try {
            // 1.從消息中獲取消息體
            String body = new String(message.getBody());
            System.out.println("Processing message: " + body);
            // 2.如果業(yè)務(wù)處理成功,則調(diào)用 basicAck 手動確認(rèn)
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            System.err.println("Message processing failed: " + e.getMessage());
            // 3.如果處理失敗,需要決定是重新入隊還是拒絕并進入死信隊列
            // requeue = true  -> 重新入隊
            // requeue = false -> 丟棄或進入死信隊列(根據(jù)隊列配置)
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

代碼詳解

配置手動確認(rèn)

application.properties 添加

spring.rabbitmq.listener.simple.acknowledge-mode=manual

表示 Spring AMQP 使用手動確認(rèn)模式(manual-ack)。

public void receiveMessage(Message message, Channel channel)

與自動確認(rèn)不同,這里不僅接收字符串,還接收了 org.springframework.amqp.core.Message 對象和 com.rabbitmq.client.Channel。Message:包含消息體(body)和消息屬性(headers 等)。Channel:給我們提供了 basicAck, basicNack, basicReject 等底層 AMQP 操作。

手動確認(rèn)成功

channel.basicAck(deliveryTag, multiple)

deliveryTag

本次消息的唯一標(biāo)記,從 message.getMessageProperties().getDeliveryTag() 獲取。

multiple = false:只確認(rèn)當(dāng)前這條消息。

basicAck(long deliveryTag, boolean multiple)

這里的 deliveryTag 并不是在你構(gòu)造 Message 時生成的,而是 RabbitMQ Broker 在投遞消息給消費者時由底層 AMQP 協(xié)議自動分配的一個遞增的序號。
long deliveryTag = message.getMessageProperties().getDeliveryTag();

手動確認(rèn)失敗

  • channel.basicNack(deliveryTag, multiple, requeue)basicReject
    • requeue = true:將消息重新放回隊列等待下一次消費(可能導(dǎo)致死循環(huán),如處理一直失?。?/li>
    • requeue = false:拒絕消息,若配置了死信隊列,則進入死信隊列;否則丟棄消息。

3)處理消費失敗

  • 自動確認(rèn)模式下的處理
  • 在自動確認(rèn)模式下,如果消息處理失敗,RabbitMQ 不會重新發(fā)送消息,因為消息已經(jīng)被確認(rèn)并從隊列中刪除。
  • 問題
  • 消息丟失,無法重新處理。
  • 手動確認(rèn)模式下的處理
  • 在手動確認(rèn)模式下,如果消息處理失敗,可以通過以下方式處理:
  • 重新入隊
  • 調(diào)用 basicNack 或 basicReject 方法,并將 requeue 參數(shù)設(shè)置為 true。
  • 消息會重新進入隊列,等待下一次消費。
  • 發(fā)送到死信隊列
  • 調(diào)用 basicNack 或 basicReject 方法,并將 requeue 參數(shù)設(shè)置為 false
  • 如果隊列配置了死信隊列,消息會被發(fā)送到死信隊列。
  • 重試機制(Spring AMQP 提供的簡單重試)(只支持手動確認(rèn)機制)

是重試失敗了才會將消息重新入隊 ,所以重試在前,重新入隊在后

# 啟用重試
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重試次數(shù)
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始重試間隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 間隔倍數(shù)
spring.rabbitmq.listener.simple.retry.multiplier=2.0
# 最大重試間隔
spring.rabbitmq.listener.simple.retry.max-interval=10000
  • Spring AMQP 提供了 重試機制,可以在消費者處理消息失敗時,自動進行多次重試,而不是直接將消息重新入隊。

行為

  • 當(dāng)消息處理失敗時,Spring AMQP 會在 本地 進行重試(即不將消息重新入隊),直到達到最大重試次數(shù)。
  • 如果重試次數(shù)用盡,消息會被拒絕(basicNack 或 basicReject),并根據(jù)配置決定是否重新入隊或發(fā)送到死信隊列。

死信隊列(DLQ)

  • 當(dāng)消息被拒絕或過期時,RabbitMQ 會將其發(fā)送到我們配置的死信交換機(DLX),再路由到死信隊列(DLQ)。
  • 配置示例
@Configuration
public class RabbitConfig {
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal-queue")
                .withArgument("x-dead-letter-exchange", "dead-letter-exchange")  // 指定死信交換機
                .withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // 指定死信路由鍵
                .build();
    }
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead-letter-exchange");
    }
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dead-letter-queue");
    }
    @Bean
    public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");
    }
}

原理

  • 正常隊列通過 x-dead-letter-exchange 指定死信交換機,一旦消息被拒絕(requeue=false)或超時(TTL 到期),RabbitMQ 會把消息發(fā)送到 dead-letter-exchange。
  • dead-letter-exchangedead-letter-queue 進行綁定(路由鍵 dead-letter-routing-key),從而實現(xiàn)死信隊列的存儲。

重新入隊 vs 發(fā)送到死信隊列

  • 重新入隊channel.basicNack(deliveryTag, false, true)
  • 適用于臨時性錯誤,比如數(shù)據(jù)庫鎖沖突、網(wǎng)絡(luò)抖動等,等待后續(xù)重新處理。
  • 發(fā)送到死信隊列channel.basicNack(deliveryTag, false, false)
  • 適用于永久性錯誤,比如消息格式無法解析,或業(yè)務(wù)邏輯指定不應(yīng)再嘗試。

到此這篇關(guān)于Spring boot框架下的RabbitMQ消息中間件的文章就介紹到這了,更多相關(guān)Spring boot RabbitMQ消息中間件內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 基于Java Springboot + Vue + MyBatis實現(xiàn)音樂播放系統(tǒng)

    基于Java Springboot + Vue + MyBatis實現(xiàn)音樂播放系統(tǒng)

    這篇文章主要介紹了一個完整的音樂播放系統(tǒng)是基于Java Springboot + Vue + MyBatis編寫的,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-08-08
  • IDEA入門級使用教程你居然還在用eclipse?

    IDEA入門級使用教程你居然還在用eclipse?

    上個月,idea的使用量超越eclipse的消息席卷了整個IT界,idea到底好在哪里呢?下面小編通過本文給大家詳細(xì)介紹下IDEA入門級使用教程,非常詳細(xì),感興趣的朋友一起看看吧
    2020-10-10
  • java8 stream多字段排序的實現(xiàn)

    java8 stream多字段排序的實現(xiàn)

    這篇文章主要介紹了java8 stream多字段排序的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • Springboot解決no main manifest attribute錯誤

    Springboot解決no main manifest attribute錯誤

    在開發(fā)Springboot項目時,使用java -jar命令運行jar包可能出現(xiàn)no main manifest attribute錯誤,本文就來介紹一下該錯誤的解決方法,感興趣的可以了解一下
    2024-09-09
  • Idea熱加載插件JRebel激活以及使用教程

    Idea熱加載插件JRebel激活以及使用教程

    JRebel是一款JVM插件,它使得Java代碼修改后不用重啟系統(tǒng),立即生效,下面這篇文章主要給大家介紹了關(guān)于Idea熱加載插件JRebel激活以及使用的相關(guān)資料,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2023-02-02
  • SpringBoot實現(xiàn)自動配置的方式詳解

    SpringBoot實現(xiàn)自動配置的方式詳解

    Spring Boot 自動配置 是其核心特性之一,它通過智能化的默認(rèn)配置減少了開發(fā)者的工作量,自動配置的原理基于條件化配置和 Spring 的 @Configuration 機制,本文給大家講解了SpringBoot實現(xiàn)自動配置的過程,需要的朋友可以參考下
    2025-04-04
  • 一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J

    一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J

    本文主要介紹了一文了解Java Log框架徹底搞懂Log4J,Log4J2,LogBack,SLF4J,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • 使用Java判定一個數(shù)值是否在指定的開閉區(qū)間范圍內(nèi)

    使用Java判定一個數(shù)值是否在指定的開閉區(qū)間范圍內(nèi)

    這篇文章主要給大家介紹了關(guān)于使用Java判定一個數(shù)值是否在指定的開閉區(qū)間范圍內(nèi)的相關(guān)資料,文中通過實例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Java具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2022-09-09
  • 用Java8 stream處理數(shù)據(jù)

    用Java8 stream處理數(shù)據(jù)

    這篇文章主要介紹了用Java8 stream處理數(shù)據(jù),Java 8 API的設(shè)計者重新提出了一個新的抽象稱為流Stream,可以讓我們以一種聲明的方式處理數(shù)據(jù),此外,數(shù)據(jù)流可以充分利用多核架構(gòu)而無需編寫多線程的一行代碼,下面我們一起來看看文章詳細(xì)介紹
    2021-11-11
  • SpringBoot集成Redis實現(xiàn)驗證碼的簡單案例

    SpringBoot集成Redis實現(xiàn)驗證碼的簡單案例

    本文主要介紹了SpringBoot集成Redis實現(xiàn)驗證碼的簡單案例,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08

最新評論