一文掌握Springboot集成RabbitMQ的方法
1、前言
消息隊列(Message Queue,簡稱 MQ)是一種異步的消息傳遞中間件,它解耦了應用程序之間的通信。應用程序可以將消息發(fā)送到隊列,而無需知道誰會接收這些消息。接收應用程序可以從隊列中檢索消息,而無需知道誰發(fā)送了這些消息。消息隊列是一種重要的中間件,它可以幫助應用程序之間進行異步、可靠、可擴展的通信。常見的消息隊列中間件有ActiveMQ,RabbitMQ,Kafka......今天我們就來介紹RabbitMQ。
2、什么是RabbitMQ
RabbitMQ 是一個開源的消息隊列服務器,它實現(xiàn)了 AMQP (高級消息隊列協(xié)議) 標準。AMQP 是一種應用層協(xié)議,為面向消息的中間件設(shè)計,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受產(chǎn)品、開發(fā)語言等條件的限制。
AMQP :Advanced Message Queue,高級消息隊列協(xié)議。它是應用層協(xié)議的一個開放標準,為面向消息的中間件設(shè)計,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受產(chǎn)品、開發(fā)語言等條件的限制。
RabbitMQ 的主要特點包括:
- 高性能:RabbitMQ 能夠處理大量的消息,并提供低延遲的性能。
- 可靠性:RabbitMQ 提供持久化消息存儲,確保消息不會丟失。
- 可擴展性:RabbitMQ 可以輕松擴展以滿足不斷增長的需求。
- 靈活性:RabbitMQ 支持多種編程語言和客戶端,并提供豐富的功能和配置選項。
RabbitMQ 的常見應用場景包括:
- 分布式系統(tǒng):RabbitMQ 可以用于在分布式系統(tǒng)中進行異步通信。
- 異步處理:RabbitMQ 可以用于異步處理任務,提高系統(tǒng)的性能和效率。
- 消息隊列:RabbitMQ 可以用于實現(xiàn)消息隊列,例如任務隊列、發(fā)布/訂閱隊列等。
- 消息通知:RabbitMQ 可以用于發(fā)送消息通知,例如電子郵件或短信。
3、安裝RabbitMQ
由于RabbitMQ是一個由 Erlang 語言開發(fā)的 AMQP 的開源實現(xiàn)。所以在安裝RabbitMQ前需要先安裝Erlang環(huán)境。
Erlang下載地址:Downloads - Erlang/OTP
RabbitMQ下載地址:Installing RabbitMQ | RabbitMQ
先安裝Erlang,在安裝RabbitMQ。安裝工程相對簡單,無腦下一步即可。
安裝完RabbitMQ后,打開cmd窗口,進入RabbitMQ的安裝目錄的sbin下,我的目錄是:
D:\RabbitMQ Server\rabbitmq_server-3.13.0\sbin
然后輸入以下命令安裝一下插件:
rabbitmq-plugins enable rabbitmq_management
提示以下這個就是安裝成功。
驗證RabbitMQ是否安裝成功,輸入以下命令:
rabbitmqctl status
這時候,直接訪問http://127.0.0.1:15672就可以看到RabbitMQ的管理頁面了,RabbitMQ默認端口為15672,默認的管理頁面賬號密碼均為guest。
登錄后,就可以看到一個初始的管理界面:
4、Springboot集成RabbitMQ
4.1、添加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq</name> <description>springboot-rabbitmq</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.24</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
4.2、添加配置
# rabbitmq連接配置信息 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 確保消息在未被隊列接收時返回 spring.rabbitmq.publisher-returns=true # 發(fā)布消息成功到交換器后會觸發(fā)回調(diào)方法 spring.rabbitmq.publisher-confirm-type=correlated
4.3、添加controller,作為生產(chǎn)者
新建controller,用于發(fā)送消息。
package com.example.springbootrabbitmq.controller; import com.example.springbootrabbitmq.config.MqProducerCallBack; import jakarta.annotation.Resource; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("push/message") public class PushMessageController { @Resource private RabbitTemplate rabbitTemplate; @Resource private MqProducerCallBack mqProducerCallBack; @GetMapping("test") public String sendMessage() { // correlationData:對象內(nèi)部只有一個 id 屬性,用來表示當前消息的唯一性。 CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis()); // 消息確認和返回回調(diào) rabbitTemplate.setConfirmCallback(mqProducerCallBack); rabbitTemplate.setReturnsCallback(mqProducerCallBack); // 消息發(fā)送 rabbitTemplate.convertAndSend("my-queue", "hello world", message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, correlationData); return "publisher success..."; } }
4.4、設(shè)置生產(chǎn)者消息確認CallBack
package com.example.springbootrabbitmq.config; import cn.hutool.json.JSONUtil; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Component public class MqProducerCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { /** * correlationData:對象內(nèi)部只有一個 id 屬性,用來表示當前消息的唯一性。 * ack:消息投遞到broker 的狀態(tài),true成功,false失敗。 * cause:投遞失敗的原因。 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.err.println("消息ID=" + correlationData.getId() + "投遞失敗,失敗原因:" + cause); } else { System.out.println("消息投遞收到確認,correlationData=" + correlationData.getId()); } } @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("返回消息結(jié)果:" + JSONUtil.toJsonStr(returnedMessage)); } }
4.5、添加Consumer,作為消費者
package com.example.springbootrabbitmq.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class PushMessageConsumer { /** * basicAck:表示成功確認,使用此回執(zhí)方法后,消息會被rabbitmq broker 刪除。 * void basicAck(long deliveryTag, boolean multiple) * deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認模式下,我們可以對指定deliveryTag的消息進行ack、nack、reject等操作。 * multiple:是否批量確認,值為 true 則會一次性 ack所有小于當前消息 deliveryTag 的消息。 * */ @RabbitListener(queuesToDeclare = @Queue(value = "my-queue")) @RabbitHandler public void consume(String msg, Channel channel, Message message) throws IOException { try { System.out.println("消費者收到消息:" + msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("deliveryTag:" + message.getMessageProperties().getDeliveryTag()); System.out.println("redelivered:" + message.getMessageProperties().getRedelivered()); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { System.err.println("消息已重復處理失敗,拒絕再次接收!"); /** * 拒絕消息,requeue=false 表示不再重新入隊,如果配置了死信隊列則進入死信隊列 * basicReject:拒絕消息,與basicNack區(qū)別在于不能進行批量操作,其他用法很相似。 * deliveryTag:表示消息投遞序號。 * requeue:值為 true 消息將重新入隊列。 */ channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { System.out.println("消息即將再次返回隊列處理!"); /** * requeue為是否重新回到隊列,true重新入隊 * deliveryTag:表示消息投遞序號。 * multiple:是否批量確認。 * requeue:值為 true 消息將重新入隊列。 */ channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
4.6、啟動程序,訪問
瀏覽器訪問:http://localhost:8080/push/message/test 模擬消息進行推送。
查看控制臺,發(fā)現(xiàn)消費者正常打印出了消費信息。
打開RabbitMQ管理控制臺,可以發(fā)現(xiàn)我們的消息隊列my-queue信息。
既可以查看消息隊列的裝填,消息投遞情況等。
到此這篇關(guān)于一文掌握Springboot集成RabbitMQ的文章就介紹到這了,更多相關(guān)Springboot集成RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot整合rabbitmq的示例代碼
- springboot集成rabbitMQ之對象傳輸?shù)姆椒?/a>
- springboot實現(xiàn)rabbitmq的隊列初始化和綁定
- Springboot 配置RabbitMQ文檔的方法步驟
- SpringBoot集成RabbitMQ的方法(死信隊列)
- SpringBoot+RabbitMq具體使用的幾種姿勢
- SpringBoot中RabbitMQ集群的搭建詳解
- SpringBoot中連接多個RabbitMQ的方法詳解
- SpringBoot實現(xiàn)RabbitMQ監(jiān)聽消息的四種方式
- SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
- SpringBoot整合RabbitMQ實現(xiàn)通配符模式
相關(guān)文章
Java并發(fā)編程ArrayBlockingQueue的使用
ArrayBlockingQueue是一個備受矚目的有界阻塞隊列,本文將全面深入地介紹ArrayBlockingQueue的內(nèi)部機制、使用場景以及最佳實踐,感興趣的可以了解一下2024-08-08springcloud?feign服務之間調(diào)用,date類型轉(zhuǎn)換錯誤的問題
這篇文章主要介紹了springcloud?feign服務之間調(diào)用,date類型轉(zhuǎn)換錯誤的問題及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03MyBatis之foreach標簽的用法及多種循環(huán)問題
這篇文章主要介紹了MyBatis之foreach標簽的用法及多種循環(huán)問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11SpringSecurity表單配置之登錄成功及頁面跳轉(zhuǎn)原理解析
這篇文章主要介紹了SpringSecurity表單配置之登錄成功及頁面跳轉(zhuǎn)原理解析,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2023-12-12