亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

spring boot集成rabbitmq的實(shí)例教程

 更新時(shí)間:2017年11月01日 11:32:19   作者:陳凡了  
這篇文章主要給大家介紹了關(guān)于spring boot集成rabbitmq的相關(guān)資料,springboot集成RabbitMQ非常簡(jiǎn)單,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友們可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。

一、RabbitMQ的介紹  

RabbitMQ是消息中間件的一種,消息中間件即分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件.這些軟件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現(xiàn)已經(jīng)轉(zhuǎn)讓給apache).

消息中間件的工作過(guò)程可以用生產(chǎn)者消費(fèi)者模型來(lái)表示.即,生產(chǎn)者不斷的向消息隊(duì)列發(fā)送信息,而消費(fèi)者從消息隊(duì)列中消費(fèi)信息.具體過(guò)程如下:

從上圖可看出,對(duì)于消息隊(duì)列來(lái)說(shuō),生產(chǎn)者,消息隊(duì)列,消費(fèi)者是最重要的三個(gè)概念,生產(chǎn)者發(fā)消息到消息隊(duì)列中去,消費(fèi)者監(jiān)聽(tīng)指定的消息隊(duì)列,并且當(dāng)消息隊(duì)列收到消息之后,接收消息隊(duì)列傳來(lái)的消息,并且給予相應(yīng)的處理.消息隊(duì)列常用于分布式系統(tǒng)之間互相信息的傳遞.

對(duì)于RabbitMQ來(lái)說(shuō),除了這三個(gè)基本模塊以外,還添加了一個(gè)模塊,即交換機(jī)(Exchange).它使得生產(chǎn)者和消息隊(duì)列之間產(chǎn)生了隔離,生產(chǎn)者將消息發(fā)送給交換機(jī),而交換機(jī)則根據(jù)調(diào)度策略把相應(yīng)的消息轉(zhuǎn)發(fā)給對(duì)應(yīng)的消息隊(duì)列.那么RabitMQ的工作流程如下所示:

緊接著說(shuō)一下交換機(jī).交換機(jī)的主要作用是接收相應(yīng)的消息并且綁定到指定的隊(duì)列.交換機(jī)有四種類(lèi)型,分別為Direct,topic,headers,Fanout.

Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡(jiǎn)單的模式.即創(chuàng)建消息隊(duì)列的時(shí)候,指定一個(gè)BindingKey.當(dāng)發(fā)送者發(fā)送消息的時(shí)候,指定對(duì)應(yīng)的Key.當(dāng)Key和消息隊(duì)列的BindingKey一致的時(shí)候,消息將會(huì)被發(fā)送到該消息隊(duì)列中.

topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊(duì)列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時(shí)候,只有指定的Key和該模式相匹配的時(shí)候,消息才會(huì)被發(fā)送到該消息隊(duì)列中.

headers也是根據(jù)一個(gè)規(guī)則進(jìn)行匹配,在消息隊(duì)列和交換機(jī)綁定的時(shí)候會(huì)指定一組鍵值對(duì)規(guī)則,而發(fā)送消息的時(shí)候也會(huì)指定一組鍵值對(duì)規(guī)則,當(dāng)兩組鍵值對(duì)規(guī)則相匹配的時(shí)候,消息會(huì)被發(fā)送到匹配的消息隊(duì)列中.

Fanout是路由廣播的形式,將會(huì)把消息發(fā)給綁定它的全部隊(duì)列,即便設(shè)置了key,也會(huì)被忽略. 

概念:

  • 生產(chǎn)者 消息的產(chǎn)生方,負(fù)責(zé)將消息推送到消息隊(duì)列
  • 消費(fèi)者 消息的最終接受方,負(fù)責(zé)監(jiān)聽(tīng)隊(duì)列中的對(duì)應(yīng)消息,消費(fèi)消息
  • 隊(duì)列 消息的寄存器,負(fù)責(zé)存放生產(chǎn)者發(fā)送的消息
  • 交換機(jī) 負(fù)責(zé)根據(jù)一定規(guī)則分發(fā)生產(chǎn)者產(chǎn)生的消息
  • 綁定 完成交換機(jī)和隊(duì)列之間的綁定

模式:

1、direct

直連模式,用于實(shí)例間的任務(wù)分發(fā)

2、topic

話(huà)題模式,通過(guò)可配置的規(guī)則分發(fā)給綁定在該exchange上的隊(duì)列

3、headers

適用規(guī)則復(fù)雜的分發(fā),用headers里的參數(shù)表達(dá)規(guī)則

4、fanout

分發(fā)給所有綁定到該exchange上的隊(duì)列,忽略routing key

安裝

單機(jī)版安裝很簡(jiǎn)單,大概步驟如下:

# 安裝erlang包
 yum install erlang
# 安裝socat
 yum install socat
# 安裝rabbit 
 rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm 
# 啟動(dòng)服務(wù)
 rabbitmq-server start
# 增加管理控制功能
 rabbitmq-plugins enable rabbitmq_management
# 增加用戶(hù):
 sudo rabbitmqctl add_user root password
 rabbitmqctl set_user_tags root administrator 
 rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集群安裝,可參考這篇文章:

     rabbitmq集群安裝

以上就是rabbitmq的介紹,下面開(kāi)始本文的正文:spring boot 集成rabbitmq ,本人在學(xué)習(xí)rabbitmq時(shí)發(fā)現(xiàn)網(wǎng)上很少有系統(tǒng)性介紹springboot和rabbitmq如何集成的,其他人總結(jié)的都片段化,所以結(jié)合個(gè)人調(diào)研過(guò)程,整理此篇文章。

二、springboot配置

廢話(huà)少說(shuō)直接上代碼:

配置參數(shù)

application.yml:

spring:
 rabbitmq:
 addresses: 192.168.1.1:5672
 username: username
 password: password
 publisher-confirms: true
 virtual-host: /

java config讀取參數(shù)

/**
 * RabbitMq配置文件讀取類(lèi)
 *
 * @author chenhf
 * @create 2017-10-23 上午9:31
 **/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {

 @Value("${spring.rabbitmq.addresses}")
 private String addresses;
 @Value("${spring.rabbitmq.username}")
 private String username;
 @Value("${spring.rabbitmq.password}")
 private String password;
 @Value("${spring.rabbitmq.publisher-confirms}")
 private Boolean publisherConfirms;
 @Value("${spring.rabbitmq.virtual-host}")
 private String virtualHost;

 // 構(gòu)建mq實(shí)例工廠
 @Bean
 public ConnectionFactory connectionFactory(){
 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
 connectionFactory.setAddresses(addresses);
 connectionFactory.setUsername(username);
 connectionFactory.setPassword(password);
 connectionFactory.setPublisherConfirms(publisherConfirms);
 connectionFactory.setVirtualHost(virtualHost);
 return connectionFactory;
 }

 @Bean
 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
 return new RabbitAdmin(connectionFactory);
 }

 @Bean
 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
 public RabbitTemplate rabbitTemplate(){
 RabbitTemplate template = new RabbitTemplate(connectionFactory());
 return template;
 }
}

三、rabbitmq生產(chǎn)者配置

主要配置了直連和話(huà)題模式,其中話(huà)題模式設(shè)置兩個(gè)隊(duì)列(queueTopicTest1、queueTopicTest2),此兩個(gè)隊(duì)列在和交換機(jī)綁定時(shí)分別設(shè)置不同的routingkey(.TEST.以及l(fā)azy.#)來(lái)驗(yàn)證匹配模式。

/**
 * 用于配置交換機(jī)和隊(duì)列對(duì)應(yīng)關(guān)系
 * 新增消息隊(duì)列應(yīng)該按照如下步驟
 * 1、增加queue bean,參見(jiàn)queueXXXX方法
 * 2、增加queue和exchange的binding
 * @author chenhf
 * @create 2017-10-23 上午10:33
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);

 /**
 * @Author:chenhf
 * @Description: 主題型交換機(jī)
 * @Date:下午5:49 2017/10/23
 * @param
 * @return
 */
 @Bean
 TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
 TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
 rabbitAdmin.declareExchange(contractTopicExchange);
 logger.debug("完成主題型交換機(jī)bean實(shí)例化");
 return contractTopicExchange;
 }
 /**
 * 直連型交換機(jī)
 */
 @Bean
 DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
 DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
 rabbitAdmin.declareExchange(contractDirectExchange);
 logger.debug("完成直連型交換機(jī)bean實(shí)例化");
 return contractDirectExchange;
 }

 //在此可以定義隊(duì)列

 @Bean
 Queue queueTest(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("測(cè)試隊(duì)列實(shí)例化完成");
 return queue;
 }

 //topic 1
 @Bean
 Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("話(huà)題測(cè)試隊(duì)列1實(shí)例化完成");
 return queue;
 }
 //topic 2
 @Bean
 Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("話(huà)題測(cè)試隊(duì)列2實(shí)例化完成");
 return queue;
 }


 //在此處完成隊(duì)列和交換機(jī)綁定
 @Bean
 Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測(cè)試隊(duì)列與直連型交換機(jī)綁定完成");
 return binding;
 }
 //topic binding1
 @Bean
 Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測(cè)試隊(duì)列與話(huà)題交換機(jī)1綁定完成");
 return binding;
 }

 //topic binding2
 @Bean
 Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測(cè)試隊(duì)列與話(huà)題交換機(jī)2綁定完成");
 return binding;
 }

}

在這里用到枚舉類(lèi):RabbitMqEnum

/**
 * 定義rabbitMq需要的常量
 *
 * @author chenhf
 * @create 2017-10-23 下午4:07
 **/
public class RabbitMqEnum {

 /**
 * @param
 * @Author:chenhf
 * @Description:定義數(shù)據(jù)交換方式
 * @Date:下午4:08 2017/10/23
 * @return
 */
 public enum Exchange {
 CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發(fā)"),
 CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"),
 CONTRACT_DIRECT("CONTRACT_DIRECT", "點(diǎn)對(duì)點(diǎn)");

 private String code;
 private String name;

 Exchange(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }
 }

 /**
 * describe: 定義隊(duì)列名稱(chēng)
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueName {
 TESTQUEUE("TESTQUEUE", "測(cè)試隊(duì)列"),
 TOPICTEST1("TOPICTEST1", "topic測(cè)試隊(duì)列"),
 TOPICTEST2("TOPICTEST2", "topic測(cè)試隊(duì)列");

 private String code;
 private String name;

 QueueName(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }

 }

 /**
 * describe: 定義routing_key
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueEnum {
 TESTQUEUE("TESTQUEUE1", "測(cè)試隊(duì)列key"),
 TESTTOPICQUEUE1("*.TEST.*", "topic測(cè)試隊(duì)列key"),
 TESTTOPICQUEUE2("lazy.#", "topic測(cè)試隊(duì)列key");


 private String code;
 private String name;

 QueueEnum(String code, String name) {
 this.code = code;
 this.name = name;
 }

 public String getCode() {
 return code;
 }

 public String getName() {
 return name;
 }
 }

}

以上完成消息生產(chǎn)者的定義,下面封裝調(diào)用接口

測(cè)試時(shí)直接調(diào)用此工具類(lèi),testUser類(lèi)需自己實(shí)現(xiàn)

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/**
 * rabbitmq發(fā)送消息工具類(lèi)
 *
 * @author chenhf
 * @create 2017-10-26 上午11:10
 **/

@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);

 private RabbitTemplate rabbitTemplate;

 @Autowired
 public RabbitMqSender(RabbitTemplate rabbitTemplate) {
 this.rabbitTemplate = rabbitTemplate;
 this.rabbitTemplate.setConfirmCallback(this);
 }

 @Override
 public void confirm(CorrelationData correlationData, boolean b, String s) {
 logger.info("confirm: " + correlationData.getId());
 }

 /**
 * 發(fā)送到 指定routekey的指定queue
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqDirect(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
 }

 /**
 * 所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqTopic(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
 }
}

四、rabbitmq消費(fèi)者配置

springboot注解方式監(jiān)聽(tīng)隊(duì)列,無(wú)法手動(dòng)指定回調(diào),所以采用了實(shí)現(xiàn)ChannelAwareMessageListener接口,重寫(xiě)onMessage來(lái)進(jìn)行手動(dòng)回調(diào),詳見(jiàn)以下代碼,詳細(xì)介紹可以在spring的官網(wǎng)上找amqp相關(guān)章節(jié)閱讀

直連消費(fèi)者

通過(guò)設(shè)置TestUser的name來(lái)測(cè)試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺(tái)中隊(duì)列中消息是否被消費(fèi)

/**
 * 消費(fèi)者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {
 @Bean("testQueueContainer")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TESTQUEUE");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("testQueueListener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 //通過(guò)設(shè)置TestUser的name來(lái)測(cè)試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺(tái)中隊(duì)列中消息是否被消費(fèi)
 if ("2".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }

 if ("1".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
 }

 }
 };
 }

}

topic消費(fèi)者1

/**
 * 消費(fèi)者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {
 @Bean("topicTest1Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST1");
 container.setMessageListener(exampleListener1());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("topicTest1Listener")
 public ChannelAwareMessageListener exampleListener1(){
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 System.out.println("TOPICTEST1:"+testUser.toString());
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

 }
 };
 }




}

topic消費(fèi)者2

/**
 * 消費(fèi)者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {
 @Bean("topicTest2Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST2");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }


 @Bean("topicTest2Listener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。

相關(guān)文章

最新評(píng)論