SpringBoot中連接多個RabbitMQ的方法詳解
1. 前 言
在 SpringBoot 中整合單個 RabbitMQ 使用,是很簡單的,只需要引入依賴,然后在配置里面配置好 MQ 的連接地址、賬號、密碼等信息,然后使用即可。
但如果 MQ 的連接地址是多個,那這種連接方式就不奏效了。
前段時間,我開發(fā)的一個項目就遇到了這樣的問題。那個項目,好幾個關(guān)聯(lián)方,每個關(guān)聯(lián)方用的 MQ 的地址都不相同,也就意味著我這邊要連接幾個 RabbbitMQ 地址。SpringBoot 連接多個 RabbitMQ,怎么搞?
使用默認的連接方式是行不通的,我已經(jīng)試過,而要實現(xiàn) SpringBoot 連接多個 RabbitMQ,只能自定義重寫一些東西,分別配置才可以,下面一起來走一下試試。
2. 重 寫
首先要明確的是,下面的兩個類是需要重寫的:
- RabbitTemplate:往隊列里面丟消息時,需要用到
- RabbitAdmin:聲明隊列、聲明交換機、綁定隊列和交換機用到
這里,我定義兩個關(guān)聯(lián)方,一個是 one,一個是 two,分別重寫與它們的連接工廠。
2.1 重寫與關(guān)聯(lián)方one的連接工廠
package com.yuhuofei.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @author yuhuofei * @version 1.0 * @description 重寫與關(guān)聯(lián)方one的連接工廠 * @date 2022/10/3 16:57 */ @Slf4j @Configuration public class OneMQConfig { @Value("${one.spring.rabbitmq.host}") private String host; @Value("${one.spring.rabbitmq.port}") private int port; @Value("${one.spring.rabbitmq.username}") private String username; @Value("${one.spring.rabbitmq.password}") private String password; @Value("${one.spring.rabbitmq.virtual-host}") private String virtualHost; /** * 定義與one的連接工廠 */ @Bean(name = "oneConnectionFactory") @Primary public ConnectionFactory oneConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "oneRabbitTemplate") @Primary public RabbitTemplate oneRabbitTemplate(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate oneRabbitTemplate = new RabbitTemplate(connectionFactory); oneRabbitTemplate.setMandatory(true); oneRabbitTemplate.setConnectionFactory(connectionFactory); oneRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 確認消息送到交換機(Exchange)回調(diào) * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("確認消息送到交換機(Exchange)結(jié)果:"); log.info("相關(guān)數(shù)據(jù):{}", correlationData); boolean ret = false; if (ack) { log.info("消息發(fā)送到交換機成功, 消息 = {}", correlationData.getId()); //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等 } else { log.error("消息發(fā)送到交換機失敗! 消息: {}}; 錯誤原因:cause: {}", correlationData.getId(), cause); //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等 } } }); oneRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息沒有投遞給指定的隊列 就觸發(fā)這個失敗回調(diào) * @param message 投遞失敗的消息詳細信息 * @param replyCode 回復(fù)的狀態(tài)碼 * @param replyText 回復(fù)的文本內(nèi)容 * @param exchange 當(dāng)時這個消息發(fā)給那個交換機 * @param routingKey 當(dāng)時這個消息用那個路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //獲取消息id String messageId = message.getMessageProperties().getMessageId(); // 內(nèi)容 String result = null; try { result = new String(message.getBody(), "UTF-8"); } catch (Exception e) { log.error("消息發(fā)送失敗{}", e); } log.error("消息發(fā)送失敗, 消息ID = {}; 消息內(nèi)容 = {}", messageId, result); //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等 } }); return oneRabbitTemplate; } @Bean(name = "oneFactory") @Primary public SimpleRabbitListenerContainerFactory oneFactory(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "oneRabbitAdmin") @Primary public RabbitAdmin oneRabbitAdmin(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
2.2 重寫與關(guān)聯(lián)方two的連接工廠
package com.yuhuofei.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author yuhuofei * @version 1.0 * @description 重寫與關(guān)聯(lián)方two的連接工廠 * @date 2022/10/3 17:52 */ @Slf4j @Configuration public class TwoMQConfig { @Value("${two.spring.rabbitmq.host}") private String host; @Value("${two.spring.rabbitmq.port}") private int port; @Value("${two.spring.rabbitmq.username}") private String username; @Value("${two.spring.rabbitmq.password}") private String password; @Value("${two.spring.rabbitmq.virtualHost}") private String virtualHost; /** * 定義與two的連接工廠 */ @Bean(name = "twoConnectionFactory") public ConnectionFactory twoConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "twoRabbitTemplate") public RabbitTemplate twoRabbitTemplate(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate twoRabbitTemplate = new RabbitTemplate(connectionFactory); twoRabbitTemplate.setMandatory(true); twoRabbitTemplate.setConnectionFactory(connectionFactory); twoRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 確認消息送到交換機(Exchange)回調(diào) * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("確認消息送到交換機(Exchange)結(jié)果:"); log.info("相關(guān)數(shù)據(jù):{}", correlationData); boolean ret = false; if (ack) { log.info("消息發(fā)送到交換機成功, 消息 = {}", correlationData.getId()); //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等 } else { log.error("消息發(fā)送到交換機失敗! 消息: {}}; 錯誤原因:cause: {}", correlationData.getId(), cause); //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等 } } }); twoRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息沒有投遞給指定的隊列 就觸發(fā)這個失敗回調(diào) * @param message 投遞失敗的消息詳細信息 * @param replyCode 回復(fù)的狀態(tài)碼 * @param replyText 回復(fù)的文本內(nèi)容 * @param exchange 當(dāng)時這個消息發(fā)給那個交換機 * @param routingKey 當(dāng)時這個消息用那個路由鍵 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { //獲取消息id String messageId = message.getMessageProperties().getMessageId(); // 內(nèi)容 String result = null; try { result = new String(message.getBody(), "UTF-8"); } catch (Exception e) { log.error("消息發(fā)送失敗{}", e); } log.error("消息發(fā)送失敗, 消息ID = {}; 消息內(nèi)容 = {}", messageId, result); //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等 } }); return twoRabbitTemplate; } @Bean(name = "twoFactory") public SimpleRabbitListenerContainerFactory twoFactory(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "twoRabbitAdmin") public RabbitAdmin twoRabbitAdmin(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
2.3 創(chuàng)建隊列及交換機并綁定
package com.yuhuofei.mq.config; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * @author yuhuofei * @version 1.0 * @description 創(chuàng)建隊列、交換機并綁定 * @date 2022/10/3 18:15 */ public class QueueConfig { @Resource(name = "oneRabbitAdmin") private RabbitAdmin oneRabbitAdmin; @Resource(name = "twoRabbitAdmin") private RabbitAdmin twoRabbitAdmin; @Value("${one.out.queue}") private String oneOutQueue; @Value("${one.out.queue}") private String oneRoutingKey; @Value("${two.output.queue}") private String twoOutQueue; @Value("${two.output.queue}") private String twoRoutingKey; @Value("${one.topic.exchange.name}") private String oneTopicExchange; @Value("${two.topic.exchange.name}") private String twoTopicExchange; @PostConstruct public void oneRabbitInit() { //聲明交換機 oneRabbitAdmin.declareExchange(new TopicExchange(oneTopicExchange, true, false)); //聲明隊列 oneRabbitAdmin.declareQueue(new Queue(oneOutQueue, true, false, false)); //綁定隊列及交換機 oneRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(oneOutQueue, true, false, false)) .to(new TopicExchange(oneTopicExchange, true, false)) .with(oneRoutingKey)); } @PostConstruct public void twoRabbitInit() { //聲明交換機 twoRabbitAdmin.declareExchange(new TopicExchange(twoTopicExchange, true, false)); //聲明隊列 twoRabbitAdmin.declareQueue(new Queue(twoOutQueue, true)); //綁定隊列及交換機 twoRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(twoOutQueue, true, false, false)) .to(new TopicExchange(twoTopicExchange, true, false)) .with(twoRoutingKey)); } }
2.4 配置信息
這里的配置信息,需要與各自的關(guān)聯(lián)方約定好再配置
# 與關(guān)聯(lián)方one的MQ配置 one.spring.rabbitmq.host=one.mq.com one.spring.rabbitmq.port=5672 one.spring.rabbitmq.username=xxxxx one.spring.rabbitmq.password=xxxxx one.spring.rabbitmq.virtual-host=/xxxxx one.out.queue=xxxaa.ssssd.cffs.xxxx one.topic.exchange.name=oneTopExchange # 與關(guān)聯(lián)方two的MQ配置 two.spring.rabbitmq.host=two.mq.com two.spring.rabbitmq.port=5672 two.spring.rabbitmq.username=aaaaaaa two.spring.rabbitmq.password=aaaaaaa two.spring.rabbitmq.virtualHost=/aaaaaaa two.out.queue=ddddd.sssss.hhhhh.eeee two.topic.exchange.name=twoTopExchange
2.5 注意點
在連接多個 MQ 的情況下,需要在某個連接加上 @Primary 注解(見 2.1 中的代碼),表示主連接,默認使用這個連接,如果不加,服務(wù)會起不來
3. 使 用
3.1 作為消費者
由于在前面的 2.3 中,聲明了隊列及交換機,并進行了綁定,那么作為消費者,監(jiān)聽相應(yīng)的隊列,獲取關(guān)聯(lián)方發(fā)送的消息進行處理即可。這里用監(jiān)聽關(guān)聯(lián)方 one 的出隊列做展示,two 的類似。
需要注意的地方是,在監(jiān)聽隊列時,需要指定 ContainerFactory。
package com.yuhuofei.mq.service; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; /** * @author yuhuofei * @version 1.0 * @description 監(jiān)聽關(guān)聯(lián)方one的消息 * @date 2022/10/3 18:38 */ @Slf4j @Service public class OneReceive { @RabbitListener(queues = "${one.out.queue}", containerFactory = "oneFactory") public void listenOne(Message message, Channel channel) { //獲取MQ返回的數(shù)據(jù) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); String data = new String(message.getBody(), StandardCharsets.UTF_8); log.info("MQ返回的數(shù)據(jù):{}", data); //下面進行業(yè)務(wù)邏輯處理 } }
3.2 作為生產(chǎn)者
使用之前重寫的 RabbitTemplate ,向各個關(guān)聯(lián)方指定的隊列發(fā)送消息。
package com.yuhuofei.mq.service; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * @author yuhuofei * @version 1.0 * @description 向關(guān)聯(lián)方的隊列發(fā)送消息 * @date 2022/10/3 18:47 */ @Slf4j @Service public class SendMessage { @Resource(name = "oneRabbitTemplate") private RabbitTemplate oneRabbitTemplate; @Resource(name = "twoRabbitTemplate") private RabbitTemplate twoRabbitTemplate; public void sendToOneMessage(String messageId, OneMessageConverter message) { String exchange = message.getExchange(); String routingKey = message.getRoutingKey(); JsonObject data = message.getData(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message info = new Message(data.toString().getBytes(), messageProperties); info.getMessageProperties().setMessageId(messageId); oneRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId)); } public void sendToTwoMessage(String messageId, TwoMessageConverter message) { String exchange = message.getExchange(); String routingKey = message.getRoutingKey(); JsonObject data = message.getData(); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message info = new Message(data.toString().getBytes(), messageProperties); info.getMessageProperties().setMessageId(messageId); twoRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId)); } }
到此這篇關(guān)于SpringBoot中連接多個RabbitMQ的方法詳解的文章就介紹到這了,更多相關(guān)SpringBoot多個RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot整合rabbitmq的示例代碼
- springboot集成rabbitMQ之對象傳輸?shù)姆椒?/a>
- springboot實現(xiàn)rabbitmq的隊列初始化和綁定
- SpringBoot+RabbitMq具體使用的幾種姿勢
- SpringBoot使用RabbitMQ延時隊列(小白必備)
- springboot + rabbitmq 如何實現(xiàn)消息確認機制(踩坑經(jīng)驗)
- SpringBoot中RabbitMQ集群的搭建詳解
- SpringBoot實現(xiàn)RabbitMQ監(jiān)聽消息的四種方式
- SpringBoot整合RabbitMQ之路由模式的實現(xiàn)
相關(guān)文章
SpringBoot?快速實現(xiàn)?api?接口加解密功能
在項目中,為了保證數(shù)據(jù)的安全,我們常常會對傳遞的數(shù)據(jù)進行加密,Spring?Boot接口加密,可以對返回值、參數(shù)值通過注解的方式自動加解密,這篇文章主要介紹了SpringBoot?快速實現(xiàn)?api?接口加解密功能,感興趣的朋友一起看看吧2023-10-10springBoot集成redis(jedis)的實現(xiàn)示例
Redis是我們Java開發(fā)中,使用頻次非常高的一個nosql數(shù)據(jù)庫,本文主要介紹了springBoot集成redis(jedis)的實現(xiàn)示例,具有一定的參考價值,感興趣的可以了解一下2023-09-09Spring攔截器實現(xiàn)鑒權(quán)的示例代碼
本文主要介紹了Spring攔截器實現(xiàn)鑒權(quán)的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07mybatis中${}和#{}的區(qū)別以及底層原理分析
這篇文章主要介紹了mybatis中${}和#{}的區(qū)別以及底層原理,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05基于SpringBoot集成測試遠程連接Redis服務(wù)的教程詳解
這篇文章主要介紹了基于SpringBoot集成測試遠程連接的Redis服務(wù)的相關(guān)知識,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-03-03