SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
一、RabbitTemplate 的使用
1.【導(dǎo)入依賴】
<!-- rabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.6.1</version> </dependency>
2.【添加配置】
rabbitmq: host: #ip地址 port: 5672 #端口 username: guest password: guest virtual-host: / listener: simple: prefetch: 1 # 默認(rèn)每次取出一條消息消費(fèi), 消費(fèi)完成取下一條 acknowledge-mode: manual # 設(shè)置消費(fèi)端手動(dòng)ack確認(rèn) retry: enabled: true # 是否支持重試 publisher-confirm-type: correlated #確認(rèn)消息已發(fā)送到交換機(jī)(Exchange) publisher-returns: true #確認(rèn)消息已發(fā)送到隊(duì)列(Queue)
3.【點(diǎn)對(duì)點(diǎn)通信(隊(duì)列模式)(Point-to-Point Messaging)】
使用方式:
這種方式也被稱為隊(duì)列(Queue)模型。消息發(fā)送者(Producer)發(fā)送消息到隊(duì)列,然后消息接收者(Consumer)從隊(duì)列中獲取消息進(jìn)行處理。這種模型下,每個(gè)消息只有一個(gè)消費(fèi)者可以接收,確保消息的可靠傳遞和順序處理。
代碼示例: 生產(chǎn)者
/** * 第一種模型: 簡(jiǎn)單模型 * 一個(gè)消息生產(chǎn)者 一個(gè)隊(duì)列 一個(gè)消費(fèi)者 * @return */ @GetMapping("hello/world") public void helloWorld() { SysUser sysUser = new SysUser(); // 發(fā)送消息 // 第一個(gè)參數(shù): String routingKey 路由規(guī)則 【交換機(jī) 和隊(duì)列的綁定規(guī)則 】 隊(duì)列名稱 // 第二個(gè)參數(shù): object message 消息的內(nèi)容 // rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!"); /// MessagePostProcessor 消息包裝器 如果需要對(duì)消息進(jìn)行包裝 rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!", message -> { // 設(shè)置唯一的標(biāo)識(shí) message.getMessageProperties().setMessageId(UUID.randomUUID().toString()); return message; });
消費(fèi)者
import com.rabbitmq.client.Channel; import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @Component @Log4j2 public class HelloWorldConsumer { @Autowired private StringRedisTemplate redisTemplate; /** * 監(jiān)聽 hello_world_queue 隊(duì)列消費(fèi)消息 * queues 監(jiān)聽隊(duì)列的名稱 要求這個(gè)隊(duì)列必須是已經(jīng)存在的隊(duì)列 * queuesToDeclare 監(jiān)聽隊(duì)列 如果這個(gè)隊(duì)列不存在 則 rabbitMQ 中 RabbitAdmin 會(huì)幫助去構(gòu)建這個(gè)隊(duì)列 */ @RabbitListener(queuesToDeclare = @Queue("hello_world_queue")) public void helloWorldConsumer(String msg, Message message, Channel channel) { // 獲取消息的唯一標(biāo)識(shí) String messageId = message.getMessageProperties().getMessageId(); // 將消息添加到 Redis的set集合中 set 不能重復(fù)的 方法的返回值 添加成功的數(shù)量 Long count = redisTemplate.opsForSet().add("hello_world_queue", messageId); if (count != null && count == 1) { // 沒有消費(fèi)過 正常消費(fèi) log.info("hello_world_queue隊(duì)列消費(fèi)者接收到了消息,消息內(nèi)容:{}", message); } } }
4.【發(fā)布/訂閱模式(Publish/Subscribe Messaging)】
使用方式:
在發(fā)布/訂閱模式中,消息發(fā)送者將消息發(fā)布到交換機(jī)(Exchange),而不是直接發(fā)送到隊(duì)列。交換機(jī)負(fù)責(zé)將消息路由到一個(gè)或多個(gè)綁定的隊(duì)列中。每個(gè)訂閱者(Subscriber)可以選擇訂閱它感興趣的消息隊(duì)列,從而接收消息。
代碼示例: 生產(chǎn)者
/** * 工作隊(duì)列 * 一個(gè)生產(chǎn)者 一個(gè)隊(duì)列 多個(gè)消費(fèi)者 */ @GetMapping("work/queue") public void workQueue() { for (int i = 1; i <= 10; i++) { rabbitTemplate.convertAndSend("work_queue", i + "hello work queue!"); } }
消費(fèi)者
import lombok.extern.log4j.Log4j2; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Log4j2 public class WorkQueueConsumer { /*** * 消費(fèi)者1 * @param message */ @RabbitListener(queuesToDeclare = @Queue("work_queue")) public void workQueueConsumer(String message) throws InterruptedException { Thread.sleep(200); log.info("work_queue隊(duì)列消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message); } /*** * 消費(fèi)者2 * @param message */ @RabbitListener(queuesToDeclare = @Queue("work_queue")) public void workQueueConsumer2(String message) throws InterruptedException { Thread.sleep(400); log.info("work_queue隊(duì)列消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message); } }
5.【工作隊(duì)列模式(Work Queues)】
使用方式:
工作隊(duì)列模式也稱為任務(wù)隊(duì)列(Task Queues),它可以用來實(shí)現(xiàn)任務(wù)的異步處理。多個(gè)工作者(Worker)同時(shí)監(jiān)聽同一個(gè)隊(duì)列,當(dāng)有新的任務(wù)消息被發(fā)送到隊(duì)列中時(shí),空閑的工作者會(huì)獲取并處理這些任務(wù),確保任務(wù)能夠并行處理而不會(huì)重復(fù)執(zhí)行。
代碼示例: 生產(chǎn)者
/** * 發(fā)布訂閱 * 一個(gè)生產(chǎn)者 多個(gè)隊(duì)列 多個(gè)消費(fèi)者 涉及 到交換機(jī) fanout */ @GetMapping("publish/subscribe") public void publishSubscribe() { // 第一個(gè)參數(shù): 交換機(jī)的名稱 沒有要求 // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則 如果是發(fā)布訂閱模式 那么這個(gè)規(guī)則默認(rèn)不寫 只需要交換機(jī)和隊(duì)列綁定即可不需要規(guī)則 // 第三個(gè)參數(shù): 消息內(nèi)容 rabbitTemplate.convertAndSend("publish_subscribe_exchange", "", "hello publisher subscribe!!"); }
消費(fèi)者
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class PublisherSubscribeConsumer { private static final Logger log = LoggerFactory.getLogger(PublisherSubscribeConsumer.class); /** * 發(fā)布訂閱模型消費(fèi)者 * * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_01"), exchange = @Exchange(name = "publish_subscribe_exchange", type = ExchangeTypes.FANOUT))) public void publisherSubscribe(String message) { log.info("發(fā)布訂閱模型消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message); } /** * 發(fā)布訂閱模型消費(fèi)者 * * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_02"), exchange = @Exchange(name = "publish_subscribe_exchange", type = ExchangeTypes.FANOUT))) public void publisherSubscribe2(String message) { log.info("發(fā)布訂閱模型消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message); } }
6.【路由模式(Routing)】
使用方式:
路由模式允許發(fā)送者根據(jù)消息的路由鍵(Routing Key)將消息路由到特定的隊(duì)列。發(fā)送者將消息發(fā)送到交換機(jī),并且通過設(shè)置不同的路由鍵,使消息能夠被交換機(jī)路由到不同的隊(duì)列。消費(fèi)者可以根據(jù)需要選擇監(jiān)聽哪些隊(duì)列來接收消息。
代碼示例: 生產(chǎn)者
/** * 路由模型 * 一個(gè)生產(chǎn)者 多個(gè)隊(duì)列 多個(gè)消費(fèi)者 涉及 到交換機(jī) direct */ @GetMapping("routing") public void routing() { // 第一個(gè)參數(shù): 交換機(jī)的名稱 沒有要求 // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則 字符串 隨意 // 第三個(gè)參數(shù): 消息內(nèi)容 rabbitTemplate.convertAndSend("routing_exchange", "aaa", "hello routing!!"); }
消費(fèi)者
import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Log4j2 public class RoutingConsumer { /** * 路由模型消費(fèi)者 * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_01"), exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT), key = { "abc", "error", "info" })) public void routingConsumer(String message) { log.info("路由模型消費(fèi)者1接收到了消息,消息內(nèi)容:{}", message); } /** * 路由模型消費(fèi)者 * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_02"), exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT), key = { "aaa", "ccc", "waadaffas" })) public void routingConsumer2(String message) { log.info("路由模型消費(fèi)者2接收到了消息,消息內(nèi)容:{}", message); } /** * 路由模型消費(fèi)者 * @param message */ @RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_03"), exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT), key = { "bbbb", "asdfasd", "asdfasdf" })) public void routingConsumer3(String message) { log.info("路由模型消費(fèi)者3接收到了消息,消息內(nèi)容:{}", message); } }
7.【主題模式(Topics)】
使用方式:
主題模式是路由模式的一種擴(kuò)展,它允許發(fā)送者根據(jù)消息的多個(gè)屬性(如主題)將消息路由到一個(gè)或多個(gè)隊(duì)列。主題交換機(jī)(Topic Exchange)使用通配符匹配路由鍵與隊(duì)列綁定鍵的模式,從而實(shí)現(xiàn)更靈活的消息路由和過濾。
代碼示例: 生產(chǎn)者
/** * 主題模型 * 一個(gè)生產(chǎn)者 多個(gè)隊(duì)列 多個(gè)消費(fèi)者 涉及 到交換機(jī) topic */ @GetMapping("topic") public void topic() { // 第一個(gè)參數(shù): 交換機(jī)的名稱 沒有要求 // 第二個(gè)參數(shù): 交換機(jī)和隊(duì)列的綁定規(guī)則 多個(gè)單詞 以 “.” 拼起來 // 第三個(gè)參數(shù): 消息內(nèi)容 rabbitTemplate.convertAndSend("topic_exchange", "bwie.age.name", "hello topic!!"); }
消費(fèi)者
import lombok.extern.log4j.Log4j2; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Log4j2 public class TopicConsumer { /** * * 表示任意一個(gè)單詞 * # 表示任意一個(gè)單詞 或 多個(gè) */ @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_01"), exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC), key = { "abc.*", "error.*.info", "#.name" })) public void topicConsumer(String message) { log.info("xxxxxxxxx1"); } /** * * 表示任意一個(gè)單詞 * # 表示任意一個(gè)單詞 或 多個(gè) */ @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_02"), exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC), key = { "abc.*", "username" })) public void topicConsumer2(String message) { log.info("xxxxxxxxx2"); } /** * * 表示任意一個(gè)單詞 * # 表示任意一個(gè)單詞 或 多個(gè) */ @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_03"), exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC), key = { "bwie.*", "error.*.info" })) public void topicConsumer3(String message) { log.info("xxxxxxxxx3"); } }
到此這篇關(guān)于SpringBoot 整合 RabbitMQ 的使用的文章就介紹到這了,更多相關(guān)SpringBoot 整合 RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot整合rabbitmq的示例代碼
- springboot集成rabbitMQ之對(duì)象傳輸?shù)姆椒?/a>
- springboot實(shí)現(xiàn)rabbitmq的隊(duì)列初始化和綁定
- Springboot 配置RabbitMQ文檔的方法步驟
- SpringBoot集成RabbitMQ的方法(死信隊(duì)列)
- SpringBoot+RabbitMq具體使用的幾種姿勢(shì)
- SpringBoot中RabbitMQ集群的搭建詳解
- SpringBoot中連接多個(gè)RabbitMQ的方法詳解
- 一文掌握Springboot集成RabbitMQ的方法
- SpringBoot實(shí)現(xiàn)RabbitMQ監(jiān)聽消息的四種方式
- SpringBoot整合RabbitMQ實(shí)現(xiàn)通配符模式
相關(guān)文章
SpringBoot+Shiro+Redis+Mybatis-plus 實(shí)戰(zhàn)項(xiàng)目及問題小結(jié)
最近也是一直在保持學(xué)習(xí)課外拓展技術(shù),所以想自己做一個(gè)簡(jiǎn)單小項(xiàng)目,于是就有了這個(gè)快速上手 Shiro 和 Redis 的小項(xiàng)目,說白了就是拿來練手調(diào)調(diào) API,然后做完后拿來總結(jié)的小項(xiàng)目,感興趣的朋友一起看看吧2021-04-04Hibernate實(shí)現(xiàn)悲觀鎖和樂觀鎖代碼介紹
這篇文章主要介紹了Hibernate實(shí)現(xiàn)悲觀鎖和樂觀鎖的有關(guān)內(nèi)容,涉及hibernate的隔離機(jī)制,以及實(shí)現(xiàn)悲觀鎖和樂觀鎖的代碼實(shí)現(xiàn),需要的朋友可以了解下。2017-09-09Java數(shù)據(jù)結(jié)構(gòu)之鏈表詳解
本篇文章我們將講解一種新型的數(shù)據(jù)結(jié)構(gòu)—鏈表,鏈表是一種使用廣泛的通用數(shù)據(jù)結(jié)構(gòu),它可以用來作為實(shí)現(xiàn)棧,隊(duì)列等數(shù)據(jù)結(jié)構(gòu)的基礎(chǔ).文中有非常詳細(xì)的介紹,需要的朋友可以參考下2021-05-05SpringBoot接口返回結(jié)果封裝方法實(shí)例詳解
在實(shí)際項(xiàng)目中,一般會(huì)把結(jié)果放在一個(gè)封裝類中,封裝類中包含http狀態(tài)值,狀態(tài)消息,以及實(shí)際的數(shù)據(jù)。這里主要記錄兩種方式,通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2021-09-09Spring ApplicationContext接口功能詳細(xì)介紹
ApplicationContext是Spring應(yīng)用程序中的中央接口,由于繼承了多個(gè)組件,使得ApplicationContext擁有了許多Spring的核心功能,如獲取bean組件,注冊(cè)監(jiān)聽事件,加載資源文件等2023-02-02解決java.lang.Error: Unresolved compilation pro
這篇文章主要介紹了解決java.lang.Error: Unresolved compilation problems:問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03