SpringBoot整合RabbitMQ的5種模式實(shí)戰(zhàn)
一、環(huán)境準(zhǔn)備

1、pom依賴
<!-- 父工程依賴 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
2、配置文件
server:
port: 8080
spring:
rabbitmq:
host: 192.168.131.171
port: 5672
username: jihu
password: jihu
virtual-host: /jihu
3、啟動(dòng)類
@SpringBootApplication
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class);
}
}
5、Swagger2類
@Configuration
@EnableSwagger2
public class Swagger2 {
// http://127.0.0.1:8080/swagger-ui.html
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.jihu"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("極狐-Spring Boot中使用spring-boot-starter-amqp集成rabbitmq")
.description("測(cè)試SpringBoot整合進(jìn)行各種工作模式信息的發(fā)送")
/*
.termsOfServiceUrl("https://www.jianshu.com/p/c79f6a14f6c9")
*/
.contact("roykingw")
.version("1.0")
.build();
}
}
6、ProducerController
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
//helloWorld 直連模式
@ApiOperation(value = "helloWorld發(fā)送接口", notes = "直接發(fā)送到隊(duì)列")
@GetMapping(value = "/helloWorldSend")
public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException {
//設(shè)置部分請(qǐng)求參數(shù)
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//發(fā)消息
rabbitTemplate.send("helloWorldqueue", new Message(message.getBytes("UTF-8"), messageProperties));
return "message sended : " + message;
}
//工作隊(duì)列模式
@ApiOperation(value = "workqueue發(fā)送接口", notes = "發(fā)送到所有監(jiān)聽該隊(duì)列的消費(fèi)")
@GetMapping(value = "/workqueueSend")
public Object workqueueSend(String message) throws AmqpException, UnsupportedEncodingException {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//制造多個(gè)消息進(jìn)行發(fā)送操作
for (int i = 0; i < 10; i++) {
rabbitTemplate.send("work_sb_mq_q", new Message(message.getBytes("UTF-8"), messageProperties));
}
return "message sended : " + message;
}
// pub/sub 發(fā)布訂閱模式 交換機(jī)類型 fanout
@ApiOperation(value = "fanout發(fā)送接口", notes = "發(fā)送到fanoutExchange。消息將往該exchange下的所有queue轉(zhuǎn)發(fā)")
@GetMapping(value = "/fanoutSend")
public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里發(fā)送消息。分發(fā)到exchange下的所有queue
rabbitTemplate.send("fanoutExchange", "", new Message(message.getBytes("UTF-8"), messageProperties));
return "message sended : " + message;
}
//routing路由工作模式 交換機(jī)類型 direct
@ApiOperation(value = "direct發(fā)送接口", notes = "發(fā)送到directExchange。exchange轉(zhuǎn)發(fā)消息時(shí),會(huì)往routingKey匹配的queue發(fā)送")
@GetMapping(value = "/directSend")
public Object routingSend(String routingKey, String message) throws AmqpException, UnsupportedEncodingException {
if (null == routingKey) {
routingKey = "china.changsha";
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里發(fā)送消息。分發(fā)到exchange下的所有queue
rabbitTemplate.send("directExchange", routingKey, new Message(message.getBytes("UTF-8"), messageProperties));
return "message sended : routingKey >" + routingKey + ";message > " + message;
}
//topic 工作模式 交換機(jī)類型 topic
@ApiOperation(value = "topic發(fā)送接口", notes = "發(fā)送到topicExchange。exchange轉(zhuǎn)發(fā)消息時(shí),會(huì)往routingKey匹配的queue發(fā)送,*代表一個(gè)單詞,#代表0個(gè)或多個(gè)單詞。")
@GetMapping(value = "/topicSend")
public Object topicSend(String routingKey, String message) throws AmqpException, UnsupportedEncodingException {
if (null == routingKey) {
routingKey = "changsha.kf";
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里發(fā)送消息。分發(fā)到exchange下的所有queue
rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"), messageProperties));
return "message sended : routingKey >" + routingKey + ";message > " + message;
}
}
7、ConcumerReceiver
@Component
public class ConcumerReceiver {
//直連模式的多個(gè)消費(fèi)者,會(huì)分到其中一個(gè)消費(fèi)者進(jìn)行消費(fèi)。類似task模式
//通過(guò)注入RabbitContainerFactory對(duì)象,來(lái)設(shè)置一些屬性,相當(dāng)于task里的channel.basicQos
@RabbitListener(queues = "helloWorldqueue")
public void helloWorldReceive(String message) {
System.out.println("helloWorld模式 received message : " + message);
}
//工作隊(duì)列模式
@RabbitListener(queues = "work_sb_mq_q")
public void wordQueueReceiveq1(String message) {
System.out.println("工作隊(duì)列模式1 received message : " + message);
}
@RabbitListener(queues = "work_sb_mq_q")
public void wordQueueReceiveq2(String message) {
System.out.println("工作隊(duì)列模式2 received message : " + message);
}
//pub/sub模式進(jìn)行消息監(jiān)聽
@RabbitListener(queues = "fanout.q1")
public void fanoutReceiveq1(String message) {
System.out.println("發(fā)布訂閱模式1received message : " + message);
}
@RabbitListener(queues = "fanout.q2")
public void fanoutReceiveq2(String message) {
System.out.println("發(fā)布訂閱模式2 received message : " + message);
}
//Routing路由模式
@RabbitListener(queues = "direct_sb_mq_q1")
public void routingReceiveq1(String message) {
System.out.println("Routing路由模式routingReceiveq11111 received message : " + message);
}
@RabbitListener(queues = "direct_sb_mq_q2")
public void routingReceiveq2(String message) {
System.out.println("Routing路由模式routingReceiveq22222 received message : " + message);
}
//topic 模式
//注意這個(gè)模式會(huì)有優(yōu)先匹配原則。例如發(fā)送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不會(huì)再去匹配*.ITd
@RabbitListener(queues = "topic_sb_mq_q1")
public void topicReceiveq1(String message) {
System.out.println("Topic模式 topic_sb_mq_q1 received message : " + message);
}
@RabbitListener(queues = "topic_sb_mq_q2")
public void topicReceiveq2(String message) {
System.out.println("Topic模式 topic_sb_mq_q2 received message : " + message);
}
}
二、簡(jiǎn)單模式
隊(duì)列配置:
/**
* HelloWorld rabbitmq第一個(gè)工作模式
* 直連模式只需要聲明隊(duì)列,所有消息都通過(guò)隊(duì)列轉(zhuǎn)發(fā)。
* 無(wú)需設(shè)置交換機(jī)
*/
@Configuration
public class HelloWorldConfig {
@Bean
public Queue setQueue() {
return new Queue("helloWorldqueue");
}
}
三、工作隊(duì)列模式
@Configuration
public class WorkConfig {
//聲明隊(duì)列
@Bean
public Queue workQ1() {
return new Queue("work_sb_mq_q");
}
}
四、廣播模式(Fanout)
/**
* Fanout模式需要聲明exchange,并綁定queue,由exchange負(fù)責(zé)轉(zhuǎn)發(fā)到queue上。
* 廣播模式 交換機(jī)類型設(shè)置為:fanout
*/
@Configuration
public class FanoutConfig {
//聲明隊(duì)列
@Bean
public Queue fanoutQ1() {
return new Queue("fanout.q1");
}
@Bean
public Queue fanoutQ2() {
return new Queue("fanout.q2");
}
//聲明exchange
@Bean
public FanoutExchange setFanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//聲明Binding,exchange與queue的綁定關(guān)系
@Bean
public Binding bindQ1() {
return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());
}
@Bean
public Binding bindQ2() {
return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange());
}
}
五、直連模式(Direct)
/*
路由模式|Routing模式 交換機(jī)類型:direct
*/
@Configuration
public class DirectConfig {
//聲明隊(duì)列
@Bean
public Queue directQ1() {
return new Queue("direct_sb_mq_q1");
}
@Bean
public Queue directQ2() {
return new Queue("direct_sb_mq_q2");
}
//聲明exchange
@Bean
public DirectExchange setDirectExchange() {
return new DirectExchange("directExchange");
}
//聲明binding,需要聲明一個(gè)routingKey
@Bean
public Binding bindDirectBind1() {
return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with("china.changsha");
}
@Bean
public Binding bindDirectBind2() {
return BindingBuilder.bind(directQ2()).to(setDirectExchange()).with("china.beijing");
}
}
六、通配符模式(Topic)
/*
Topics模式 交換機(jī)類型 topic
* */
@Configuration
public class TopicConfig {
//聲明隊(duì)列
@Bean
public Queue topicQ1() {
return new Queue("topic_sb_mq_q1");
}
@Bean
public Queue topicQ2() {
return new Queue("topic_sb_mq_q2");
}
//聲明exchange
@Bean
public TopicExchange setTopicExchange() {
return new TopicExchange("topicExchange");
}
//聲明binding,需要聲明一個(gè)roytingKey
@Bean
public Binding bindTopicHebei1() {
return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("changsha.*");
}
@Bean
public Binding bindTopicHebei2() {
return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.beijing");
}
}
測(cè)試
我們啟動(dòng)上面的SpringBoot項(xiàng)目。
然后我們?cè)L問(wèn)swagger地址:http://127.0.0.1:8080/swagger-ui.html

然后我們就可以使用swagger測(cè)試接口了。


或者可以使用postman進(jìn)行測(cè)試。
到此這篇關(guān)于SpringBoot整合RabbitMQ的5種模式實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ模式內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
一篇文章教你如何用Java自定義一個(gè)參數(shù)校驗(yàn)器
這篇文章主要介紹了使用java自定義一個(gè)參數(shù)校驗(yàn)器,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)2021-09-09
SpringBoot feign動(dòng)態(tài)設(shè)置數(shù)據(jù)源(https請(qǐng)求)
這篇文章主要介紹了SpringBoot如何在運(yùn)行時(shí)feign動(dòng)態(tài)添加數(shù)據(jù)源,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2021-08-08
spring?boot+vue實(shí)現(xiàn)JSAPI微信支付的完整步驟
JSAPI支付是用戶在微信中打開商戶的H5頁(yè)面,商戶在H5頁(yè)面通過(guò)調(diào)用微信支付提供的JSAPI接口調(diào)起微信支付模塊完成支付,下面這篇文章主要給大家介紹了關(guān)于spring?boot+vue實(shí)現(xiàn)JSAPI微信支付的相關(guān)資料,需要的朋友可以參考下2022-05-05
SpringBoot 使用hibernate validator校驗(yàn)
這篇文章主要介紹了SpringBoot 使用hibernate validator校驗(yàn),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-11-11
淺談一下RabbitMQ、Kafka和RocketMQ消息中間件對(duì)比
這篇文章主要介紹了淺談一下RabbitMQ、Kafka和RocketMQ消息中間件對(duì)比,消息中間件屬于分布式系統(tǒng)中一個(gè)字系統(tǒng),關(guān)注于數(shù)據(jù)的發(fā)送和接收,利用高效可靠的異步信息傳遞機(jī)制對(duì)分布式系統(tǒng)中的其余各個(gè)子系統(tǒng)進(jìn)行集成,需要的朋友可以參考下2023-05-05

