SpringAMQP消息隊(duì)列(SpringBoot集成RabbitMQ方式)
一、初始配置
1、導(dǎo)入maven坐標(biāo)
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2、yml配置
spring:
rabbitmq:
host: 你的rabbitmq的ip
port: 5672
username: guest
password: guest二、基本消息隊(duì)列

1、創(chuàng)建隊(duì)列
訪問(wèn)接口:http://localhost:15672,賬號(hào)密碼都為guest

進(jìn)入后左下角有Add queue添加隊(duì)列,我已添加隊(duì)列為MqTest1
2、發(fā)布消息
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
String queue="MqTest1";
String message="message1";
rabbitTemplate.convertAndSend(queue,message);
}
}此時(shí)可以看到隊(duì)列有一個(gè)消息

3、接受消息
package com.rabbitmqdemoconsumer.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitLeistener {
@RabbitListener(queues = "MqTest1")
public void listenSimpleQueueMessage(String msg){
System.out.println("接收到的消息:"+msg);
}
}此時(shí)控制臺(tái)輸出接收到的消息

三、工作消息隊(duì)列(Work Queue)

可以提高消息處理速度,避免隊(duì)列消息堆積
1、發(fā)布消息
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
String queue="MqTest1";
String message="message1";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(queue,message);
}
}
}此時(shí)隊(duì)列有10條消息
2、接受消息
package com.rabbitmqdemoconsumer.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitLeistener {
@RabbitListener(queues = "MqTest1")
public void listenSimpleQueueMessage1(String msg){
System.out.println("consume1接收到的消息:"+msg);
}
@RabbitListener(queues = "MqTest1")
public void listenSimpleQueueMessage2(String msg){
System.out.println("consume2接收到的消息:"+msg);
}
}3、控制臺(tái)輸出結(jié)果
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
consume1接收到的消息:message1
consume2接收到的消息:message1
4、消息預(yù)取問(wèn)題
但是此時(shí)有一個(gè)問(wèn)題就是消息預(yù)取,比如隊(duì)列有10條消息,兩個(gè)消費(fèi)者各自直接先預(yù)取5個(gè)消息,如果一個(gè)消費(fèi)者接受消息的速度慢,一個(gè)快,就會(huì)導(dǎo)致一個(gè)消費(fèi)者已經(jīng)完成工作,另一個(gè)還在慢慢處理,會(huì)造成消息堆積消費(fèi)者身上,要解決這個(gè)問(wèn)題需要在yml文件配置相關(guān)配置
rabbitmq:
host: 43.140.244.236
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
prefetch: 1 #每次只能取一個(gè),處理完才能取下一個(gè)消息這樣可以避免消息預(yù)取導(dǎo)致堆積
四、發(fā)布訂閱模式
exchange是交換機(jī),負(fù)責(zé)消息路由,但不存儲(chǔ)消息,路由失敗則消息丟失

五、發(fā)布訂閱模式之廣播模式(Fanout)

1、Fanout配置類(lèi)(@Bean聲明)
package com.rabbitmqdemoconsumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanountConfig {
//交換機(jī)聲明
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("FanountExchange");
}
//聲明隊(duì)列1
@Bean
public Queue Fanount_Qeueue1(){
return new Queue("Fanount_Qeueue1");
}
//聲明隊(duì)列2
@Bean
public Queue Fanount_Qeueue2(){
return new Queue("Fanount_Qeueue2");
}
//綁定交換機(jī)和隊(duì)列
@Bean
public Binding bindingFanount_Qeueue1(Queue Fanount_Qeueue1,FanoutExchange fanoutExchange){
return BindingBuilder.bind(Fanount_Qeueue1).to(fanoutExchange);
}
@Bean
public Binding bindingFanount_Qeueue2(Queue Fanount_Qeueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(Fanount_Qeueue2).to(fanoutExchange);
}
}可以看到聲明的隊(duì)列

已經(jīng)聲明的交換機(jī)(第一個(gè))

綁定關(guān)系

2、發(fā)送消息
首先發(fā)送10條消息,經(jīng)過(guò)交換機(jī)轉(zhuǎn)發(fā)到隊(duì)列
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads2() {
String exchange="FanountExchange";
String message="message";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchange,"",message);
}
}
}此時(shí)可以看到兩個(gè)隊(duì)列各自有十條消息

3、接受消息
//監(jiān)聽(tīng)交換機(jī)Fanount_Qeueue1
@RabbitListener(queues = "Fanount_Qeueue1")
public void listenFanountQeueue1(String msg){
System.out.println("Fanount_Qeueue1接收到的消息:"+msg);
}
//監(jiān)聽(tīng)交換機(jī)Fanount_Qeueue2
@RabbitListener(queues = "Fanount_Qeueue2")
public void listenFanountQeueue2(String msg){
System.out.println("Fanount_Qeueue2接收到的消息:"+msg);
}控制臺(tái)結(jié)果如下(共發(fā)送20條,每個(gè)隊(duì)列10條)
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue1接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue2接收到的消息:message
Fanount_Qeueue2接收到的消息:message
六、發(fā)布訂閱模式之路由模式(Direct)
會(huì)將消息根據(jù)規(guī)則路由到指定的隊(duì)列

1、聲明(基于@RabbitListener聲明)
package com.rabbitmqdemoconsumer.rabbitmq;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitLeistener {
/**
* 綁定交換機(jī)和隊(duì)列,并為key賦值
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "DirectQueue1"),
exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("listenDirectQueue1接收到的消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "DirectQueue2"),
exchange = @Exchange(name = "DirectExchange",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("listenDirectQueue2接收到的消息:"+msg);
}
}此時(shí)可以看到聲明的隊(duì)列

聲明的交換機(jī)(第一個(gè))

綁定關(guān)系

2、發(fā)送給blue
發(fā)送消息
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads2() {
String exchange="DirectExchange";
String message="HelloWorld";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchange,"blue",message);
}
}
}接收消息
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
3、發(fā)送給red
發(fā)送消息
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads2() {
String exchange="DirectExchange";
String message="HelloWorld";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchange,"blue",message);
}
}
}接收消息
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
listenDirectQueue2(red,yellow)接收到的消息:HelloWorld
listenDirectQueue1(red,blue)接收到的消息:HelloWorld
七、發(fā)布訂閱模式之廣播模式(Topic)
Queue與Exchange指定BindingKey可以使用通配符:
- #:代指0個(gè)或多個(gè)單詞
- *:代指一個(gè)單詞
比如:
- bindingkey: china.# ->中國(guó)的所有消息
- bindingkey: #.weather ->所以國(guó)家的天氣

1、聲明
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "TopicQueue1"),
exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
key = {"china.#"}
))
public void listenTopicQueue1(String msg){
System.out.println("listenTopicQueue1接收到的消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "TopicQueue2"),
exchange = @Exchange(name = "TopicExchange",type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String msg){
System.out.println("listenTopicQueue2接收到的消息:"+msg);
}隊(duì)列

交換機(jī)(第四個(gè))

綁定關(guān)系

2、發(fā)送消息(測(cè)試1)
package com.rabbitmqdemo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads2() {
String exchange="TopicExchange";
String message="HelloWorld";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchange,"china.news",message);
}
}
}接收消息
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue2接收到的消息:HelloWorld
3、發(fā)送消息(測(cè)試2)
發(fā)送消息
package com.rabbitmqdemo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitMQDemoPublishApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads2() {
String exchange="TopicExchange";
String message="HelloWorld";
for (int i=0;i<10;i++){
rabbitTemplate.convertAndSend(exchange,"china.weather",message);
}
}
}接收消息
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
TopicQueue1接收到的消息:HelloWorld
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java模擬發(fā)送form-data的請(qǐng)求方式
這篇文章主要介紹了java模擬發(fā)送form-data的請(qǐng)求方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-05-05
詳解Spring Batch 輕量級(jí)批處理框架實(shí)踐
這篇文章主要介紹了詳解Spring Batch 輕量級(jí)批處理框架實(shí)踐,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06
java使用selenium自動(dòng)化WebDriver等待的示例代碼
顯式等待和隱式等待是WebDriver中兩種常用的等待方式,它們都可以用來(lái)等待特定的條件滿足后再繼續(xù)執(zhí)行代碼,本文給大家介紹java使用selenium自動(dòng)化WebDriver等待,感興趣的朋友一起看看吧2023-09-09
Springboot源碼 AbstractAdvisorAutoProxyCreator解析
這篇文章主要介紹了Springboot源碼 AbstractAdvisorAutoProxyCreator解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08
Nacos客戶(hù)端配置中心緩存動(dòng)態(tài)更新實(shí)現(xiàn)源碼
這篇文章主要為大家介紹了Nacos客戶(hù)端配置中心緩存動(dòng)態(tài)更新實(shí)現(xiàn)源碼,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-03-03
SpringBoot中的multipartResolver上傳文件配置
這篇文章主要介紹了SpringBoot中的multipartResolver上傳文件配置,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10
java局部變量表的基礎(chǔ)知識(shí)點(diǎn)及實(shí)例
在本篇文章里小編給大家整理的是一篇關(guān)于java局部變量表的基礎(chǔ)知識(shí)點(diǎn)及實(shí)例,有需要的朋友們可以學(xué)習(xí)參考下。2021-06-06

