Spring?boot?整合RabbitMQ實現(xiàn)通過RabbitMQ進行項目的連接
什么是RabbitMQ
一款基于AMQP用于軟件之間通信的中間件 。消費并不需要確保提供方存在,實現(xiàn)了服務(wù)之間的高度解耦
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現(xiàn),服務(wù)器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。
基本組成:Queue:消息隊列,存儲消息的隊列,消息到達隊列并轉(zhuǎn)發(fā)給指定的消費方;
Exchange消息隊列交換機,按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個隊列,對消息進行過慮;
消費者:消費信息;
提供者:發(fā)送消息;
官網(wǎng) Messaging that just works — RabbitMQ
消息隊列:接受并轉(zhuǎn)發(fā)消息,類似于快遞公司
product : 消息的發(fā)送者,生產(chǎn)者
consumer:消息的消費者,從隊列獲取消息,并且使用
queue :先進先出,一個queue可以對應(yīng)多個consumer
消息隊列的優(yōu)點
代碼解耦,提高系統(tǒng)穩(wěn)定性
應(yīng)對流量高峰,降低流量沖擊,面對秒殺這種情況時,請求進來先去排隊,可以保證系統(tǒng)的穩(wěn)定
異步執(zhí)行,提高系統(tǒng)響應(yīng)速度
消息隊列的特性
性能好
它是一種基礎(chǔ)組件
支持消息確認(rèn),為了防止數(shù)據(jù)丟失以及應(yīng)對特殊情況,在數(shù)據(jù)沒有處理完,沒有確認(rèn)之前消息不會丟掉。
RabbitMQ特點
路由能力靈活強大
開源免費
支持編程語言多
應(yīng)用廣泛,社區(qū)活躍
有開箱即用的監(jiān)控和管理后臺
RabbitMQ核心概念
生產(chǎn)者數(shù)量是不限制的,生產(chǎn)者生產(chǎn)的消息Message進入交換機,交換一可以連接多個隊列也可以僅連接一個對聯(lián),交換機與隊列的關(guān)系是不固定的,交換機會綁定到隊列上(Binding)根據(jù)的規(guī)則就是Routing Key路由鍵用來確定交換機與隊列如何進行綁定 ,消息經(jīng)過交換機經(jīng)過連接發(fā)送個消費者,在連接中多多個信道,數(shù)據(jù)都是在信道中進行讀寫的,消費者從中提取想要的消息進行處理。Broker(服務(wù)實例)也就是服務(wù)端,Virtual Host (虛擬主機)同一個RabbitMQ可能給多個服務(wù)進行使用,服務(wù)與服務(wù)之間想要隔離開就可以使用虛擬主機進行隔離。
Producer :消息生產(chǎn)者
Message :消息
Exchange :交換機
Binding :綁定交換機和隊列
Routing key :路由鍵,決定路由規(guī)則
Queue :隊列,存儲消息
Connection :連接服務(wù)端
Channel :信道,讀寫數(shù)據(jù).
Consumer :消費者
Broker :服務(wù)實例
Virtual host :虛擬主機,用于區(qū)分不同服務(wù),類似于不同域名,不會相互影響
安裝RabbitMQ
LINUX環(huán)境下安裝3.8.2 使用Xshell
先進行環(huán)境配置
連接成功以后輸入
echo "export LC_ALL=en_US.UTF-8" >> /etc/profile 把編碼設(shè)置成utf-8
source /etc/profile 使設(shè)置生效
輸入curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash 配置RabbitMQ源
看到這個命令就可以進行下一步了
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
配置erlang環(huán)境
看到這個命令進行下一步
sudo yum install rabbitmq-server-3.8.2-1.el7.noarch
輸入y
常用命令
開啟web管理界面
rabbitmq-plugins enable rabbitmq_management
停止RabbitMQ
rabbitmqctl stop
設(shè)置開機啟動
systemctl enable rabbitmq-server
啟動RabbitMQ
systemctl start rabbitmq-server
看看端口有沒有起來,查看狀態(tài)
rabbitmqctl status
要檢查RabbitMQ服務(wù)器的狀態(tài),請運行:
systemctl status rabbitmq-server
Windows
先安裝erlang并配置環(huán)境,安裝RabbitMQ
鏈接:https://pan.baidu.com/s/1S4D2zh-NSoXh-QPQVNBi-w
提取碼:1111
這里直接放上鏈接,erlang安裝好后要去配置環(huán)境
解壓縮后sbin目錄下,rabbitmq-server.bat 這個文件就是啟動
用終端cmd輸入:
cd d:\你的RabbitMQ按照地址\sbin
rabbitmq-plugins enable rabbitmq_management
rabbitmq-server
然后就可以用guest訪問http://127.0.0.1:15672/#/
賬號密碼都是guest
RabbitMQ實操分布了解
1 生產(chǎn)者
這里的前提是你有個云服務(wù)器,并且已經(jīng)完成了配置,為了操作簡便這里就用本機了哈
我們要有一個管理者啊在sbin目錄輸入
rabbitmqctl add_user newadmin newpassword
rabbitmqctl set_user_tags newadmin administrator
rabbitmqctl set_permissions -p / newadmin ".*" ".*" ".*"//這一步已經(jīng)把在虛擬主機上把權(quán)限配置了
賬號test 密碼123456
新建一個mavene項目,
2 引入依賴
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!-- 記錄日志--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.29</version> </dependency> </dependencies>
/** * 描述 發(fā)送類 連接到服務(wù)端 發(fā)送 退出 */ public class Send { //設(shè)置隊列的名字 private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //創(chuàng)建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設(shè)置RabbitMQ地址 factory.setHost("127.0.0.1"); factory.setUsername("test"); factory.setPassword("123456"); //建立連接 Connection connection = factory.newConnection(); //獲得信道 Channel channel = connection.createChannel(); //聲明隊列 // queueName 持久存在? 獨有? 自動刪除? channel.queueDeclare(QUEUE_NAME, false, false, false, null); //發(fā)布消息 String message = "Hello World! "; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("發(fā)送了消息:" + message); //關(guān)閉連接 channel.close(); connection.close(); } }
運行一下
發(fā)送成功了 如果我么連接不到RabbitMQ是無法正常發(fā)送的
2 消費者
我么要做的就是把剛剛發(fā)送的存儲在隊列里的消息拿到并打印出來
** * 描述: 接收消息,并打印,持續(xù)運行 */ public class Recvice { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //創(chuàng)建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設(shè)置RabbitMQ地址 factory.setHost("127.0.0.1"); factory.setUsername("test"); factory.setPassword("123456"); //建立連接 Connection connection = factory.newConnection(); //獲得信道 Channel channel = connection.createChannel(); //聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //接收消息并消費 queueName 自動簽收 處理消息 channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息:" + message); } }); } }
可以看到Receive是(打錯了,尬)一直運行的,我么把發(fā)送的消息改一下再發(fā)送試試
我們之前設(shè)置的是自動接收消息們可以看到運行時成功的
去web控制臺也能看到是有hello這個隊列的 還有更多的功能就靠你們自己去探索了
Springboot 整合RabbitMQ代碼實操
1 新建兩個Spring項目 一個生產(chǎn)者,一個消費者不需要引入依賴一會兒手動加
主要關(guān)鍵是定義隊列 queue 定義routingKey
生產(chǎn)者
配置文件
guest是默認(rèn)的用戶只能本機時使用
server.port=8080 spring.application.name=producer spring.rabbitmq.addresses=127.0.0.1:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000
依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-boot-rabbirmq-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-rabbirmq-producer</name> <description>spring-boot-rabbirmq-producer</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
我們只在原基礎(chǔ)上加了一個依賴
spring-boot-starter-amqp
啟動類
@SpringBootApplication public class SpringBootRabbirmqProducerApplication { public static void main(String[] args) { SpringApplication.run(SpringBootRabbirmqProducerApplication.class, args); } }
發(fā)送消息類
/** * 描述: 發(fā)送消息 */ @Component public class MessageSender { @Autowired private AmqpTemplate rabbitmqTemplate; public void send1() { String message = "This is message 1, routing key is hello.sayHello"; System.out.println("發(fā)送了:"+message); // 交換機 key 內(nèi)容 this.rabbitmqTemplate.convertAndSend("bootExchange", "hello.sayHello", message); } public void send2() { String message = "This is message 2, routing key is hello.sayNothing"; System.out.println("發(fā)送了:"+message); this.rabbitmqTemplate.convertAndSend("bootExchange", "hello.sayNothing", message); } }
配置類
/** * 描述: rabbitmq配置類 */ @Configuration public class TopicRabbitConfig { //定義隊列 注意類型:import org.springframework.amqp.core.Queue; @Bean public Queue queue1() { return new Queue("queue1"); } @Bean public Queue queue2() { return new Queue("queue2"); } //交換機 @Bean TopicExchange exchange() { return new TopicExchange("bootExchange"); } //將隊列綁定到交換機 @Bean Binding bingdingExchangeMessage1(Queue queue1, TopicExchange exchange) { return BindingBuilder.bind(queue1).to(exchange).with("hello.sayHello"); } @Bean Binding bingdingExchangeMessage2(Queue queue2, TopicExchange exchange) { return BindingBuilder.bind(queue2).to(exchange).with("hello.#"); } }
這里注意第一個消息的routingkey是跟配置類一樣的hello.sayHello 就代表 我們這個交換機是僅能識別hello.sayHello的
第二個交換機的routingkey是hello.# 那就意味著只要key是hello.()類型我們都能識別到也就是第一個和第二個消息都能識別到
編寫測試類用來發(fā)送消息
@SpringBootTest class SpringBootRabbirmqProducerApplicationTests { @Autowired MessageSender messageSender; @Test public void send1(){ messageSender.send1(); } @Test public void send2(){ messageSender.send2(); } }
生產(chǎn)者就編寫完成
消費者
配置文件,大體一樣,用戶我用的管理者權(quán)限的用戶test 端口號不能一樣
server.port=8081 spring.application.name=consumer spring.rabbitmq.addresses=127.0.0.1:5672 spring.rabbitmq.username=test spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000
依賴 與生產(chǎn)者一樣只用加一個
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-boot-rabbitmq-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-boot-rabbitmq-consumer</name> <description>spring-boot-rabbitmq-consumer</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
啟動類
@SpringBootApplication public class SpringBootRabbitmqConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringBootRabbitmqConsumerApplication.class, args); } }
消費者1 消費者一綁的隊列是queue1 接收消息是要通過交換機-> 隊列-> 信道 那就意味著隊列1中將有hello.sayHello
/** * 描述: 消費者1 */ @Component @RabbitListener(queues = "queue1") public class Receiver1 { //處理方法 @RabbitHandler public void process(String message) { System.out.println("Receiver1: " + message); } }
消費者2
/** * 描述: 消費者2 */ @Component @RabbitListener(queues = "queue2") public class Receiver2 { @RabbitHandler public void process(String message) { System.out.println("Receiver2: " + message); } }
運行結(jié)果
這本身是兩個獨立的項目,但是通過RabbitMQ使兩個項目產(chǎn)生了連接,Springboot完成了對RabbitMQ的整合。
到此這篇關(guān)于Springboot整合RabbitMQ實現(xiàn)通過RabbitMQ進行項目的連接的文章就介紹到這了,更多相關(guān)Springboot整合RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring boot使用logback實現(xiàn)多環(huán)境日志配置詳解
這篇文章主要介紹了spring boot使用logback實現(xiàn)多環(huán)境日志配置詳解,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-08-08springcloud gateway如何實現(xiàn)路由和負(fù)載均衡
這篇文章主要介紹了springcloud gateway如何實現(xiàn)路由和負(fù)載均衡的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07關(guān)于@PostConstruct、afterPropertiesSet和init-method的執(zhí)行順序
這篇文章主要介紹了關(guān)于@PostConstruct、afterPropertiesSet和init-method的執(zhí)行順序,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09解讀Jvm的內(nèi)存結(jié)構(gòu)與GC及jvm參數(shù)調(diào)優(yōu)
這篇文章主要介紹了解讀Jvm的內(nèi)存結(jié)構(gòu)與GC及jvm參數(shù)調(diào)優(yōu)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-05-05SpringCloud讓微服務(wù)實現(xiàn)指定程序調(diào)用
這篇文章主要介紹了SpringCloud讓微服務(wù)實現(xiàn)指定程序調(diào)用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-06-06