SpringBoot中的RabbitMQ用法詳解
Spring Boot中的 RabbitMQ 是什么,如何使用
RabbitMQ 的基本概念
在使用 RabbitMQ 之前,我們需要了解一些基本概念。
消息隊(duì)列
消息隊(duì)列是一種用于異步通信的機(jī)制。消息發(fā)送者將消息發(fā)送到隊(duì)列中,消息接收者從隊(duì)列中獲取消息。通過消息隊(duì)列,可以實(shí)現(xiàn)消息的異步傳遞,降低系統(tǒng)之間的耦合性。
消息
消息是指需要傳遞的數(shù)據(jù)。
生產(chǎn)者
生產(chǎn)者是指向消息隊(duì)列發(fā)送消息的程序。
消費(fèi)者
消費(fèi)者是指從消息隊(duì)列中獲取消息并處理的程序。
隊(duì)列
隊(duì)列是指消息存儲(chǔ)的地方。
交換機(jī)
交換機(jī)是用于接收生產(chǎn)者發(fā)送的消息,并將消息路由到相應(yīng)的隊(duì)列中。
路由鍵
路由鍵是一個(gè)字符串,用于指定消息應(yīng)該被路由到哪個(gè)隊(duì)列中。
綁定
綁定是指將隊(duì)列和交換機(jī)連接起來的過程。
如何在 Spring Boot 中使用 RabbitMQ
添加依賴
首先,我們需要在 Maven 或 Gradle 中添加 RabbitMQ 的依賴。在 Maven 中,我們可以添加以下依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在 Gradle 中,我們可以添加以下依賴:
implementation 'org.springframework.boot:spring-boot-starter-amqp'
配置 RabbitMQ
在 Spring Boot 中,我們可以使用 application.yml 或者 application.properties 文件來配置 RabbitMQ。
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
創(chuàng)建生產(chǎn)者
下面是一個(gè)簡(jiǎn)單的 RabbitMQ 生產(chǎn)者示例:
@Component public class RabbitMQProducer { private final RabbitTemplate rabbitTemplate; public RabbitMQProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void send(String message) { rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message); } }
在這個(gè)示例中,我們使用了 Spring AMQP 提供的 RabbitTemplate 類來發(fā)送消息。convertAndSend
方法用于將消息發(fā)送到指定的交換機(jī)和路由鍵。
創(chuàng)建消費(fèi)者
下面是一個(gè)簡(jiǎn)單的 RabbitMQ 消費(fèi)者示例:
@Component public class RabbitMQConsumer { @RabbitListener(queues = "myQueue") public void receive(String message) { System.out.println("Received message: " + message); } }
在這個(gè)示例中,我們使用了 Spring AMQP 提供的 @RabbitListener
注解來指定消費(fèi)者應(yīng)該監(jiān)聽哪個(gè)隊(duì)列。當(dāng)有消息到達(dá)隊(duì)列時(shí),receive
方法會(huì)被調(diào)用,并且接收到消息的內(nèi)容會(huì)作為參數(shù)傳遞給該方法。
運(yùn)行示例
現(xiàn)在我們已經(jīng)創(chuàng)建了一個(gè)簡(jiǎn)單的 RabbitMQ 應(yīng)用程序。我們可以在 main 方法中創(chuàng)建 Spring Boot 應(yīng)用程序,并在其中注入我們的生產(chǎn)者和消費(fèi)者。然后,我們可以使用生產(chǎn)者向隊(duì)列發(fā)送消息,消費(fèi)者將會(huì)接收到這些消息并輸出到控制臺(tái)。
@SpringBootApplication public class Application implements CommandLineRunner { private final RabbitMQProducer rabbitMQProducer; private final RabbitMQConsumer rabbitMQConsumer; public Application(RabbitMQProducer rabbitMQProducer, RabbitMQConsumer rabbitMQConsumer) { this.rabbitMQProducer = rabbitMQProducer; this.rabbitMQConsumer = rabbitMQConsumer; } public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Override public void run(String... args) throws Exception { rabbitMQProducer.send("Hello, RabbitMQ!"); } }
運(yùn)行程序后,我們可以在控制臺(tái)看到類似于以下輸出:
Received message: Hello, RabbitMQ!
這表明我們已經(jīng)成功地向隊(duì)列發(fā)送了一條消息,并且消費(fèi)者已經(jīng)成功地接收到了這個(gè)消息。
RabbitMQ 高級(jí)功能
除了基本功能之外,RabbitMQ 還提供了一些高級(jí)功能,例如:
消息確認(rèn)
當(dāng)生產(chǎn)者發(fā)送一條消息時(shí),它并不知道這條消息是否已經(jīng)被成功處理。如果消息沒有被成功處理,生產(chǎn)者將會(huì)不斷地嘗試重發(fā)這條消息,直到它被成功處理為止。
為了解決這個(gè)問題,RabbitMQ 提供了消息確認(rèn)機(jī)制。當(dāng)生產(chǎn)者發(fā)送一條消息時(shí),它可以請(qǐng)求 RabbitMQ 確認(rèn)這條消息是否已經(jīng)被成功處理。如果消息已經(jīng)被成功處理,RabbitMQ 將會(huì)發(fā)送一個(gè)確認(rèn)消息給生產(chǎn)者。
@Component public class RabbitMQProducer { private final RabbitTemplate rabbitTemplate; public RabbitMQProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void send(String message) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); } @Bean public ConfirmCallback confirmCallback() { return (correlationData, ack, cause) -> { if (ack) { System.out.println("Message with correlation id " + correlationData.getId() + " has been confirmed"); } else { System.out.println("Message with correlation id " + correlationData.getId() + " has been rejected: " + cause); } }; } }
在這個(gè)示例中,我們?cè)谏a(chǎn)者中使用了 CorrelationData 類來跟蹤消息。我們還創(chuàng)建了一個(gè) ConfirmCallback bean 來處理消息確認(rèn)。當(dāng)消息被成功處理時(shí),confirmCallback 方法將會(huì)被調(diào)用,并輸出一條確認(rèn)消息。當(dāng)消息被拒絕時(shí),confirmCallback 方法也將會(huì)被調(diào)用,并輸出一條拒絕消息。
消息持久化
默認(rèn)情況下,RabbitMQ 不會(huì)將消息持久化到磁盤上。如果 RabbitMQ 在崩潰之前沒有將消息發(fā)送給消費(fèi)者,這些消息將會(huì)丟失。
為了解決這個(gè)問題,我們可以將消息標(biāo)記為持久化。這樣,即使 RabbitMQ 崩潰,消息也會(huì)被保存到磁盤上,并在 RabbitMQ 重啟后重新發(fā)送給消費(fèi)者。
@Component public class RabbitMQProducer { private final RabbitTemplate rabbitTemplate; public RabbitMQProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void send(String message) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message messageObject = new Message(message.getBytes(), messageProperties); rabbitTemplate.send("myExchange", "myRoutingKey", messageObject); } }
在這個(gè)示例中,我們?cè)谏a(chǎn)者中使用了 MessageProperties
類來設(shè)置消息的持久化屬性。我們還使用了 MessageDeliveryMode.PERSISTENT
枚舉值來標(biāo)記消息為持久化消息。
消息 TTL
消息 TTL(Time To Live)是指消息在隊(duì)列中存儲(chǔ)的時(shí)間。如果消息在指定的時(shí)間內(nèi)沒有被消費(fèi)者消費(fèi),它將會(huì)被從隊(duì)列中自動(dòng)刪除。
@Component public class RabbitMQProducer { private final RabbitTemplate rabbitTemplate; public RabbitMQProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void send(String message) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("5000"); // 5 seconds Message messageObject = new Message(message.getBytes(), messageProperties); rabbitTemplate.send("myExchange", "myRoutingKey", messageObject); } }
在這個(gè)示例中,我們?cè)谏a(chǎn)者中使用了 MessageProperties
類來設(shè)置消息的 TTL 屬性。我們將 messageProperties.setExpiration("5000")
設(shè)置為 5000 毫秒,這意味著消息在隊(duì)列中最多存儲(chǔ) 5 秒鐘。
死信隊(duì)列
死信隊(duì)列是指當(dāng)消息被拒絕或者過期時(shí),它將會(huì)被重新路由到另一個(gè)隊(duì)列中。這個(gè)隊(duì)列就被稱為死信隊(duì)列。
@Configuration public class RabbitMQConfig { @Bean public Queue myQueue() { return QueueBuilder.durable("myQueue") .withArgument("x-dead-letter-exchange", "myDeadLetterExchange") .withArgument("x-dead-letter-routing-key", "myDeadLetterRoutingKey") .build(); } @Bean public Queue myDeadLetterQueue() { return QueueBuilder.durable("myDeadLetterQueue").build(); } @Bean public Exchange myExchange() { return ExchangeBuilder.directExchange("myExchange").durable(true).build(); } @Bean public Exchange myDeadLetterExchange() { return ExchangeBuilder.directExchange("myDeadLetterExchange").durable(true).build(); } @Bean public Binding binding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey").noargs(); } @Bean public Binding deadLetterBinding() { return BindingBuilder.bind(myDeadLetterQueue()).to(myDeadLetterExchange()).with("myDeadLetterRoutingKey").noargs(); } }
在這個(gè)示例中,我們創(chuàng)建了一個(gè)名為 myQueue
的隊(duì)列,并使用 withArgument
方法來指定它的死信交換機(jī)和路由鍵。我們還創(chuàng)建了一個(gè)名為 myDeadLetterQueue
的隊(duì)列,并將其綁定到名為 myDeadLetterExchange
的交換機(jī)上。最后,我們創(chuàng)建了綁定,將 myQueue
隊(duì)列綁定到 myExchange
交換機(jī)上。
當(dāng)消息在 myQueue
中被拒絕或過期時(shí),它將會(huì)被重新路由到 myDeadLetterExchange
交換機(jī),并將其路由到 myDeadLetterQueue
隊(duì)列中。
總結(jié)
本文介紹了 RabbitMQ 的基本概念,以及如何在 Spring Boot 中使用 RabbitMQ。我們還介紹了 RabbitMQ 的一些高級(jí)功能,包括消息確認(rèn)、消息持久化、消息 TTL 和死信隊(duì)列。通過學(xué)習(xí)本文,你應(yīng)該已經(jīng)有了足夠的知識(shí)來開始在 Spring Boot 中使用 RabbitMQ 了。
以上就是SpringBoot中的RabbitMQ的用法詳解的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot RabbitMQ的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringFramework中的數(shù)據(jù)校驗(yàn)方式
這篇文章主要介紹了SpringFramework中的數(shù)據(jù)校驗(yàn)方式,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-12-12springcloud feign docker上無法通訊的問題及解決
這篇文章主要介紹了springcloud feign docker上無法通訊的問題及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03java基礎(chǔ)之Collection與Collections和Array與Arrays的區(qū)別
這篇文章主要介紹了java基礎(chǔ)之Collection與Collections和Array與Arrays的區(qū)別的相關(guān)資料,本文主要說明兩者的區(qū)別以防大家混淆概念,需要的朋友可以參考下2017-08-08Springboot啟動(dòng)原理和自動(dòng)配置原理解析
這篇文章主要介紹了Springboot啟動(dòng)原理和自動(dòng)配置原理解析,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04關(guān)于JSON.toJSONString()和Gson.toJson()方法的比較
本文介紹了兩種將Java對(duì)象轉(zhuǎn)換為JSON字符串的方法:阿里的`JSON.toJSONString()`和谷歌的`Gson.toJson()`,通過一個(gè)示例,展示了當(dāng)使用繼承關(guān)系且子類覆蓋父類字段時(shí),`Gson`會(huì)報(bào)錯(cuò),而`JSON`可以正常運(yùn)行,作者建議在處理JSON相關(guān)操作時(shí)使用阿里的`JSON`類2024-11-11mybatis和mybatis-plus設(shè)置值為null不起作用問題及解決
Mybatis-Plus的FieldStrategy主要用于控制新增、更新和查詢時(shí)對(duì)空值的處理策略,通過配置不同的策略類型,可以靈活地處理實(shí)體對(duì)象的空值問題2025-02-02