亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

詳解SpringBoot中使用RabbitMQ的RPC功能

 更新時(shí)間:2021年11月15日 16:20:07   作者:黑瑩de希望  
這篇文章主要介紹了詳解SpringBoot中使用RabbitMQ的RPC功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

一、RabbitMQ的RPC簡介

實(shí)際業(yè)務(wù)中,有的時(shí)候我們還需要等待消費(fèi)者返回結(jié)果給我們,或者是說我們需要消費(fèi)者上的一個(gè)功能、一個(gè)方法或是一個(gè)接口返回給我們相應(yīng)的值,而往往大型的系統(tǒng)軟件,生產(chǎn)者跟消費(fèi)者之間都是相互獨(dú)立的兩個(gè)系統(tǒng),部署在兩個(gè)不同的電腦上,不能通過直接對(duì)象.方法的形式獲取想要的結(jié)果,這時(shí)候我們就需要用到RPC(Remote Procedure Call)遠(yuǎn)程過程調(diào)用方式。
RabbitMQ實(shí)現(xiàn)RPC的方式很簡單,生產(chǎn)者發(fā)送一條帶有標(biāo)簽(消息ID(correlation_id)+回調(diào)隊(duì)列名稱)的消息到發(fā)送隊(duì)列,消費(fèi)者(也稱RPC服務(wù)端)從發(fā)送隊(duì)列獲取消息并處理業(yè)務(wù),解析標(biāo)簽的信息將業(yè)務(wù)結(jié)果發(fā)送到指定的回調(diào)隊(duì)列,生產(chǎn)者從回調(diào)隊(duì)列中根據(jù)標(biāo)簽的信息獲取發(fā)送消息的返回結(jié)果。

在這里插入圖片描述

如圖,客戶端C發(fā)送消息,指定消息的ID=rpc_id,回調(diào)響應(yīng)的隊(duì)列名稱為rpc_resp,消息從C發(fā)送到rpc_request隊(duì)列,服務(wù)端S獲取消息業(yè)務(wù)處理之后,將correlation_id附加到響應(yīng)的結(jié)果發(fā)送到指定的回調(diào)隊(duì)列rpc_resp中,客戶端從回調(diào)隊(duì)列獲取消息,匹配與發(fā)送消息的correlation_id相同的值為消息應(yīng)答結(jié)果。

二、SpringBoot中使用RabbitMQ的RPC功能

注意:springboot中使用的時(shí)候,correlation_id為系統(tǒng)自動(dòng)生成的,reply_to在加載AmqpTemplate實(shí)例的時(shí)候設(shè)置的。

實(shí)例:
說明:隊(duì)列1為發(fā)送隊(duì)列,隊(duì)列2為返回隊(duì)列

1.先配置rabbitmq

package com.ws.common;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/*
 * rabbitMQ配置類
 */
@Configuration
public class RabbitMQConfig {
	public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    
    @Autowired
    ConnectionFactory connectionFactory;
    
    @Bean(name = "connectionFactory")
    public ConnectionFactory connectionFactory() {
    	CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    	connectionFactory.setHost(host);
    	connectionFactory.setPort(port);
    	connectionFactory.setUsername(username);
    	connectionFactory.setPassword(password);
    	connectionFactory.setVirtualHost("/");
    	return connectionFactory;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate() {
    	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    	//設(shè)置reply_to(返回隊(duì)列,只能在這設(shè)置)
    	rabbitTemplate.setReplyAddress(TOPIC_QUEUE2);
    	rabbitTemplate.setReplyTimeout(60000);
    	return rabbitTemplate;
    }
    //返回隊(duì)列監(jiān)聽器(必須有)
    @Bean(name="replyMessageListenerContainer")
    public SimpleMessageListenerContainer createReplyListenerContainer() {
         SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
         listenerContainer.setConnectionFactory(connectionFactory);
         listenerContainer.setQueueNames(TOPIC_QUEUE2);
         listenerContainer.setMessageListener(rabbitTemplate());
         return listenerContainer;
    }
    

    
    //創(chuàng)建隊(duì)列
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE1);
    }
    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE2);
    }
    
    //創(chuàng)建交換機(jī)
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    
    //交換機(jī)與隊(duì)列進(jìn)行綁定
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_QUEUE1);
    }
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_QUEUE2);
    }
}

2.發(fā)送消息并同步等待返回值

@Autowired
private RabbitTemplate rabbitTemplate;


//報(bào)文body
String sss = "報(bào)文的內(nèi)容";
//封裝Message
Message msg = this.con(sss);
log.info("客戶端--------------------"+msg.toString());
//使用sendAndReceive方法完成rpc調(diào)用
Message message=rabbitTemplate.sendAndReceive(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE1, msg);
//提取rpc回應(yīng)內(nèi)容body
String response = new String(message.getBody());
log.info("回應(yīng):" + response);
log.info("rpc完成---------------------------------------------");


public Message con(String s) {
	MessageProperties mp = new MessageProperties();
	byte[] src = s.getBytes(Charset.forName("UTF-8"));
	//mp.setReplyTo("adsdas");   加載AmqpTemplate時(shí)設(shè)置,這里設(shè)置沒用
	//mp.setCorrelationId("2222");   系統(tǒng)生成,這里設(shè)置沒用
	mp.setContentType("application/json");
	mp.setContentEncoding("UTF-8");
	mp.setContentLength((long)s.length());
	return new Message(src, mp);
} 

3.寫消費(fèi)者

package com.ws.listener.mq;

import java.nio.charset.Charset;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.ws.common.RabbitMQConfig;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class Receiver {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE1)
	public void receiveTopic1(Message msg) {
		log.info("隊(duì)列1:"+msg.toString());
		String msgBody = new String(msg.getBody());
		//數(shù)據(jù)處理,返回的Message
		Message repMsg = con(msgBody+"返回了", msg.getMessageProperties().getCorrelationId());
		
		rabbitTemplate.send(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE2, repMsg);
		
    }
	@RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE2)
	public void receiveTopic2(Message msg) {
		log.info("隊(duì)列2:"+msg.toString());
		
    }
	
	public Message con(String s, String id) {
		MessageProperties mp = new MessageProperties();
		byte[] src = s.getBytes(Charset.forName("UTF-8"));
		mp.setContentType("application/json");
		mp.setContentEncoding("UTF-8");
		mp.setCorrelationId(id);
		
		return new Message(src, mp);
	} 
}

日志打?。?/p>

2019-06-26 17:11:16.607 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 客戶端--------------------(Body:‘報(bào)文的內(nèi)容' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=5, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])

2019-06-26 17:11:16.618 [SimpleAsyncTaskExecutor-1] INFO com.ws.listener.mq.Receiver - 隊(duì)列1:(Body:‘報(bào)文的內(nèi)容' MessageProperties [headers={}, correlationId=1, replyTo=topic.queue2, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=topic.queue1, deliveryTag=1, consumerTag=amq.ctag-8IzlhblYmTebqUYd-uferw, consumerQueue=topic.queue1])

2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 回應(yīng):報(bào)文的內(nèi)容返回了

2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - rpc完成---------------------------------------------

到此這篇關(guān)于SpringBoot中使用RabbitMQ的RPC功能的文章就介紹到這了,更多相關(guān)SpringBoot使用RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot集成minio實(shí)現(xiàn)文件上傳和刪除功能

    SpringBoot集成minio實(shí)現(xiàn)文件上傳和刪除功能

    這篇文章主要介紹了SpringBoot集成minio實(shí)現(xiàn)文件上傳和刪除功能,詳細(xì)介紹每個(gè)功能的實(shí)現(xiàn)步驟和代碼示例,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-11-11
  • Spring的跨域的幾個(gè)方案

    Spring的跨域的幾個(gè)方案

    這篇文章主要介紹了Spring的跨域的幾個(gè)方案,CrossOrigin、addCorsMappings、CorsFIlter等方案,具有一定的參考價(jià)值,需要的小伙伴可以參考一下,希望對(duì)你有所幫助
    2022-02-02
  • RocketMQ特性Broker存儲(chǔ)事務(wù)消息實(shí)現(xiàn)

    RocketMQ特性Broker存儲(chǔ)事務(wù)消息實(shí)現(xiàn)

    這篇文章主要為大家介紹了RocketMQ特性Broker存儲(chǔ)事務(wù)消息實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08
  • Spring Cloud Stream微服務(wù)消息框架原理及實(shí)例解析

    Spring Cloud Stream微服務(wù)消息框架原理及實(shí)例解析

    這篇文章主要介紹了Spring Cloud Stream微服務(wù)消息框架原理及實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06
  • 通過面試題解析 Java 類加載機(jī)制

    通過面試題解析 Java 類加載機(jī)制

    類加載是 Java 語言的一個(gè)創(chuàng)新,也是 Java 語言流行的重要原因之一。它使得 Java 類可以被動(dòng)態(tài)加載到 Java 虛擬機(jī)中并執(zhí)行。下面小編和大家來一起學(xué)習(xí)一下吧
    2019-05-05
  • SpringBoot部署到騰訊云的實(shí)現(xiàn)示例

    SpringBoot部署到騰訊云的實(shí)現(xiàn)示例

    記錄一下自己第一次部署springboot項(xiàng)目,本文主要介紹了SpringBoot部署到騰訊云的實(shí)現(xiàn)示例,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-08-08
  • Spring AOP 自定義注解的實(shí)現(xiàn)代碼

    Spring AOP 自定義注解的實(shí)現(xiàn)代碼

    本篇文章主要介紹了Spring AOP 自定義注解的實(shí)現(xiàn)代碼,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-04-04
  • 淺談Java數(shù)值類型的轉(zhuǎn)換與強(qiáng)制轉(zhuǎn)換

    淺談Java數(shù)值類型的轉(zhuǎn)換與強(qiáng)制轉(zhuǎn)換

    這篇文章主要介紹了Java數(shù)值類型的轉(zhuǎn)換與強(qiáng)制轉(zhuǎn)換,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-04-04
  • Stream中的Peek操作代碼

    Stream中的Peek操作代碼

    這篇文章主要介紹了Stream中的Peek操作,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-09-09
  • Java數(shù)據(jù)結(jié)構(gòu)二叉樹難點(diǎn)解析

    Java數(shù)據(jù)結(jié)構(gòu)二叉樹難點(diǎn)解析

    樹是一種重要的非線性數(shù)據(jù)結(jié)構(gòu),直觀地看,它是數(shù)據(jù)元素(在樹中稱為結(jié)點(diǎn))按分支關(guān)系組織起來的結(jié)構(gòu),很象自然界中的樹那樣。樹結(jié)構(gòu)在客觀世界中廣泛存在,如人類社會(huì)的族譜和各種社會(huì)組織機(jī)構(gòu)都可用樹形象表示
    2021-10-10

最新評(píng)論