亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot中連接多個RabbitMQ的方法詳解

 更新時間:2023年10月17日 10:48:22   作者:yuhuofei2021  
這篇文章主要介紹了SpringBoot中連接多個RabbitMQ的方法詳解,要實現(xiàn) SpringBoot 連接多個 RabbitMQ,只能自定義重寫一些東西,分別配置才可以,下面一起來走一下試試,需要的朋友可以參考下

1. 前 言

在 SpringBoot 中整合單個 RabbitMQ 使用,是很簡單的,只需要引入依賴,然后在配置里面配置好 MQ 的連接地址、賬號、密碼等信息,然后使用即可。

但如果 MQ 的連接地址是多個,那這種連接方式就不奏效了。

前段時間,我開發(fā)的一個項目就遇到了這樣的問題。那個項目,好幾個關(guān)聯(lián)方,每個關(guān)聯(lián)方用的 MQ 的地址都不相同,也就意味著我這邊要連接幾個 RabbbitMQ 地址。SpringBoot 連接多個 RabbitMQ,怎么搞?

使用默認的連接方式是行不通的,我已經(jīng)試過,而要實現(xiàn) SpringBoot 連接多個 RabbitMQ,只能自定義重寫一些東西,分別配置才可以,下面一起來走一下試試。

2. 重 寫

首先要明確的是,下面的兩個類是需要重寫的:

  • RabbitTemplate:往隊列里面丟消息時,需要用到
  • RabbitAdmin:聲明隊列、聲明交換機、綁定隊列和交換機用到

這里,我定義兩個關(guān)聯(lián)方,一個是 one,一個是 two,分別重寫與它們的連接工廠。

2.1 重寫與關(guān)聯(lián)方one的連接工廠

package com.yuhuofei.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * @author yuhuofei
 * @version 1.0
 * @description 重寫與關(guān)聯(lián)方one的連接工廠
 * @date 2022/10/3 16:57
 */
@Slf4j
@Configuration
public class OneMQConfig {


    @Value("${one.spring.rabbitmq.host}")
    private String host;

    @Value("${one.spring.rabbitmq.port}")
    private int port;

    @Value("${one.spring.rabbitmq.username}")
    private String username;

    @Value("${one.spring.rabbitmq.password}")
    private String password;

    @Value("${one.spring.rabbitmq.virtual-host}")
    private String virtualHost;

    /**
     * 定義與one的連接工廠
     */
    @Bean(name = "oneConnectionFactory")
    @Primary
    public ConnectionFactory oneConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean(name = "oneRabbitTemplate")
    @Primary
    public RabbitTemplate oneRabbitTemplate(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate oneRabbitTemplate = new RabbitTemplate(connectionFactory);
        oneRabbitTemplate.setMandatory(true);
        oneRabbitTemplate.setConnectionFactory(connectionFactory);
        oneRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 確認消息送到交換機(Exchange)回調(diào)
             * @param correlationData
             * @param ack
             * @param cause
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("確認消息送到交換機(Exchange)結(jié)果:");
                log.info("相關(guān)數(shù)據(jù):{}", correlationData);
                boolean ret = false;
                if (ack) {
                    log.info("消息發(fā)送到交換機成功, 消息 = {}", correlationData.getId());
                    //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等
                    
                } else {
                    log.error("消息發(fā)送到交換機失敗! 消息: {}}; 錯誤原因:cause: {}", correlationData.getId(), cause);
                    //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等
                    
                }
            }
        });

        oneRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * 只要消息沒有投遞給指定的隊列 就觸發(fā)這個失敗回調(diào)
             * @param message  投遞失敗的消息詳細信息
             * @param replyCode 回復(fù)的狀態(tài)碼
             * @param replyText 回復(fù)的文本內(nèi)容
             * @param exchange 當(dāng)時這個消息發(fā)給那個交換機
             * @param routingKey 當(dāng)時這個消息用那個路由鍵
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //獲取消息id
                String messageId = message.getMessageProperties().getMessageId();
                // 內(nèi)容
                String result = null;
                try {
                    result = new String(message.getBody(), "UTF-8");
                } catch (Exception e) {
                    log.error("消息發(fā)送失敗{}", e);
                }
                log.error("消息發(fā)送失敗, 消息ID = {}; 消息內(nèi)容 = {}", messageId, result);
                //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等
            }
        });
        return oneRabbitTemplate;
    }

    @Bean(name = "oneFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory oneFactory(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory,
                                                          SimpleRabbitListenerContainerFactoryConfigurer configurer) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name = "oneRabbitAdmin")
    @Primary
    public RabbitAdmin oneRabbitAdmin(@Qualifier("oneConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

}

2.2 重寫與關(guān)聯(lián)方two的連接工廠

package com.yuhuofei.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author yuhuofei
 * @version 1.0
 * @description 重寫與關(guān)聯(lián)方two的連接工廠
 * @date 2022/10/3 17:52
 */
@Slf4j
@Configuration
public class TwoMQConfig {

    @Value("${two.spring.rabbitmq.host}")
    private String host;

    @Value("${two.spring.rabbitmq.port}")
    private int port;

    @Value("${two.spring.rabbitmq.username}")
    private String username;

    @Value("${two.spring.rabbitmq.password}")
    private String password;

    @Value("${two.spring.rabbitmq.virtualHost}")
    private String virtualHost;

    /**
     * 定義與two的連接工廠
     */
    @Bean(name = "twoConnectionFactory")
    public ConnectionFactory twoConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean(name = "twoRabbitTemplate")
    public RabbitTemplate twoRabbitTemplate(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate twoRabbitTemplate = new RabbitTemplate(connectionFactory);
        twoRabbitTemplate.setMandatory(true);
        twoRabbitTemplate.setConnectionFactory(connectionFactory);
        twoRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 確認消息送到交換機(Exchange)回調(diào)
             * @param correlationData
             * @param ack
             * @param cause
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("確認消息送到交換機(Exchange)結(jié)果:");
                log.info("相關(guān)數(shù)據(jù):{}", correlationData);
                boolean ret = false;
                if (ack) {
                    log.info("消息發(fā)送到交換機成功, 消息 = {}", correlationData.getId());
                    //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等

                } else {
                    log.error("消息發(fā)送到交換機失敗! 消息: {}}; 錯誤原因:cause: {}", correlationData.getId(), cause);
                    //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等

                }
            }
        });

        twoRabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            /**
             * 只要消息沒有投遞給指定的隊列 就觸發(fā)這個失敗回調(diào)
             * @param message  投遞失敗的消息詳細信息
             * @param replyCode 回復(fù)的狀態(tài)碼
             * @param replyText 回復(fù)的文本內(nèi)容
             * @param exchange 當(dāng)時這個消息發(fā)給那個交換機
             * @param routingKey 當(dāng)時這個消息用那個路由鍵
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //獲取消息id
                String messageId = message.getMessageProperties().getMessageId();
                // 內(nèi)容
                String result = null;
                try {
                    result = new String(message.getBody(), "UTF-8");
                } catch (Exception e) {
                    log.error("消息發(fā)送失敗{}", e);
                }
                log.error("消息發(fā)送失敗, 消息ID = {}; 消息內(nèi)容 = {}", messageId, result);
                //下面可自定義業(yè)務(wù)邏輯處理,如入庫保存信息等
            }
        });
        return twoRabbitTemplate;
    }

    @Bean(name = "twoFactory")
    public SimpleRabbitListenerContainerFactory twoFactory(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory,
                                                           SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name = "twoRabbitAdmin")
    public RabbitAdmin twoRabbitAdmin(@Qualifier("twoConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

2.3 創(chuàng)建隊列及交換機并綁定

package com.yuhuofei.mq.config;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * @author yuhuofei
 * @version 1.0
 * @description 創(chuàng)建隊列、交換機并綁定
 * @date 2022/10/3 18:15
 */
public class QueueConfig {


    @Resource(name = "oneRabbitAdmin")
    private RabbitAdmin oneRabbitAdmin;

    @Resource(name = "twoRabbitAdmin")
    private RabbitAdmin twoRabbitAdmin;

    @Value("${one.out.queue}")
    private String oneOutQueue;

    @Value("${one.out.queue}")
    private String oneRoutingKey;

    @Value("${two.output.queue}")
    private String twoOutQueue;

    @Value("${two.output.queue}")
    private String twoRoutingKey;

    @Value("${one.topic.exchange.name}")
    private String oneTopicExchange;

    @Value("${two.topic.exchange.name}")
    private String twoTopicExchange;

    @PostConstruct
    public void oneRabbitInit() {
        //聲明交換機
        oneRabbitAdmin.declareExchange(new TopicExchange(oneTopicExchange, true, false));
        //聲明隊列
        oneRabbitAdmin.declareQueue(new Queue(oneOutQueue, true, false, false));
        //綁定隊列及交換機
        oneRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(oneOutQueue, true, false, false))
                .to(new TopicExchange(oneTopicExchange, true, false))
                .with(oneRoutingKey));
    }

    @PostConstruct
    public void twoRabbitInit() {
        //聲明交換機
        twoRabbitAdmin.declareExchange(new TopicExchange(twoTopicExchange, true, false));
        //聲明隊列
        twoRabbitAdmin.declareQueue(new Queue(twoOutQueue, true));
        //綁定隊列及交換機
        twoRabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(twoOutQueue, true, false, false))
                .to(new TopicExchange(twoTopicExchange, true, false))
                .with(twoRoutingKey));
    }
}

2.4 配置信息

這里的配置信息,需要與各自的關(guān)聯(lián)方約定好再配置

# 與關(guān)聯(lián)方one的MQ配置
one.spring.rabbitmq.host=one.mq.com
one.spring.rabbitmq.port=5672
one.spring.rabbitmq.username=xxxxx
one.spring.rabbitmq.password=xxxxx
one.spring.rabbitmq.virtual-host=/xxxxx
one.out.queue=xxxaa.ssssd.cffs.xxxx
one.topic.exchange.name=oneTopExchange

# 與關(guān)聯(lián)方two的MQ配置
two.spring.rabbitmq.host=two.mq.com
two.spring.rabbitmq.port=5672
two.spring.rabbitmq.username=aaaaaaa
two.spring.rabbitmq.password=aaaaaaa
two.spring.rabbitmq.virtualHost=/aaaaaaa
two.out.queue=ddddd.sssss.hhhhh.eeee
two.topic.exchange.name=twoTopExchange

2.5 注意點

在連接多個 MQ 的情況下,需要在某個連接加上 @Primary 注解(見 2.1 中的代碼),表示主連接,默認使用這個連接,如果不加,服務(wù)會起不來

3. 使 用

3.1 作為消費者

由于在前面的 2.3 中,聲明了隊列及交換機,并進行了綁定,那么作為消費者,監(jiān)聽相應(yīng)的隊列,獲取關(guān)聯(lián)方發(fā)送的消息進行處理即可。這里用監(jiān)聽關(guān)聯(lián)方 one 的出隊列做展示,two 的類似。

需要注意的地方是,在監(jiān)聽隊列時,需要指定 ContainerFactory。

package com.yuhuofei.mq.service;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;

/**
 * @author yuhuofei
 * @version 1.0
 * @description 監(jiān)聽關(guān)聯(lián)方one的消息
 * @date 2022/10/3 18:38
 */
@Slf4j
@Service
public class OneReceive {

    @RabbitListener(queues = "${one.out.queue}", containerFactory = "oneFactory")
    public void listenOne(Message message, Channel channel) {
        //獲取MQ返回的數(shù)據(jù)
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        String data = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("MQ返回的數(shù)據(jù):{}", data);
        //下面進行業(yè)務(wù)邏輯處理
        
    }
}

3.2 作為生產(chǎn)者

使用之前重寫的 RabbitTemplate ,向各個關(guān)聯(lián)方指定的隊列發(fā)送消息。

package com.yuhuofei.mq.service;

import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @author yuhuofei
 * @version 1.0
 * @description 向關(guān)聯(lián)方的隊列發(fā)送消息
 * @date 2022/10/3 18:47
 */
@Slf4j
@Service
public class SendMessage {

    @Resource(name = "oneRabbitTemplate")
    private RabbitTemplate oneRabbitTemplate;

    @Resource(name = "twoRabbitTemplate")
    private RabbitTemplate twoRabbitTemplate;

    public void sendToOneMessage(String messageId, OneMessageConverter message) {
        String exchange = message.getExchange();
        String routingKey = message.getRoutingKey();
        JsonObject data = message.getData();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message info = new Message(data.toString().getBytes(), messageProperties);
        info.getMessageProperties().setMessageId(messageId);
        oneRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId));
    }

    public void sendToTwoMessage(String messageId, TwoMessageConverter message) {
        String exchange = message.getExchange();
        String routingKey = message.getRoutingKey();
        JsonObject data = message.getData();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message info = new Message(data.toString().getBytes(), messageProperties);
        info.getMessageProperties().setMessageId(messageId);
        twoRabbitTemplate.convertAndSend(exchange, routingKey, info, new CorrelationData(messageId));
    }
}

到此這篇關(guān)于SpringBoot中連接多個RabbitMQ的方法詳解的文章就介紹到這了,更多相關(guān)SpringBoot多個RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java中的雙重檢查(Double-Check)詳解

    Java中的雙重檢查(Double-Check)詳解

    這篇文章主要為大家詳細介紹了Java中的雙重檢查(Double-Check),感興趣的小伙伴們可以參考一下
    2016-02-02
  • IDEA安裝Leetcode插件的教程

    IDEA安裝Leetcode插件的教程

    這篇文章主要介紹了IDEA安裝Leetcode插件的教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-11-11
  • java實現(xiàn)銀行管理系統(tǒng)

    java實現(xiàn)銀行管理系統(tǒng)

    這篇文章主要為大家詳細介紹了java實現(xiàn)銀行管理系統(tǒng),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-12-12
  • Java 實現(xiàn)常見的非對稱加密算法

    Java 實現(xiàn)常見的非對稱加密算法

    這篇文章主要介紹了Java 實現(xiàn)常見的非對稱加密算法,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下
    2020-11-11
  • SpringBoot?快速實現(xiàn)?api?接口加解密功能

    SpringBoot?快速實現(xiàn)?api?接口加解密功能

    在項目中,為了保證數(shù)據(jù)的安全,我們常常會對傳遞的數(shù)據(jù)進行加密,Spring?Boot接口加密,可以對返回值、參數(shù)值通過注解的方式自動加解密,這篇文章主要介紹了SpringBoot?快速實現(xiàn)?api?接口加解密功能,感興趣的朋友一起看看吧
    2023-10-10
  • Java實現(xiàn)提取圖片邊緣的示例代碼

    Java實現(xiàn)提取圖片邊緣的示例代碼

    這篇文章主要為大家詳細介紹了如何利用Java實現(xiàn)提取圖片邊緣的功能,文中的示例代碼講解詳細,具有一定的學(xué)習(xí)價值,感興趣的小伙伴可以了解一下
    2023-06-06
  • springBoot集成redis(jedis)的實現(xiàn)示例

    springBoot集成redis(jedis)的實現(xiàn)示例

    Redis是我們Java開發(fā)中,使用頻次非常高的一個nosql數(shù)據(jù)庫,本文主要介紹了springBoot集成redis(jedis)的實現(xiàn)示例,具有一定的參考價值,感興趣的可以了解一下
    2023-09-09
  • Spring攔截器實現(xiàn)鑒權(quán)的示例代碼

    Spring攔截器實現(xiàn)鑒權(quán)的示例代碼

    本文主要介紹了Spring攔截器實現(xiàn)鑒權(quán)的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • mybatis中${}和#{}的區(qū)別以及底層原理分析

    mybatis中${}和#{}的區(qū)別以及底層原理分析

    這篇文章主要介紹了mybatis中${}和#{}的區(qū)別以及底層原理,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • 基于SpringBoot集成測試遠程連接Redis服務(wù)的教程詳解

    基于SpringBoot集成測試遠程連接Redis服務(wù)的教程詳解

    這篇文章主要介紹了基于SpringBoot集成測試遠程連接的Redis服務(wù)的相關(guān)知識,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-03-03

最新評論