Java中RabbitMQ消息隊列的交換機詳解
RabbitMQ交換機
交換機屬性
- Name:交換機名稱
- Type:交換機類型 direct、topic、fanout、headers
- Durability:是否需要持久化,true為持久化
- Auto Delete:當(dāng)最后一個綁定到Exchange上的隊列刪除后,自動刪除該Exchange
- Internal:當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為False
- Arguments:擴展參數(shù),用于擴展AMQP協(xié)議,定制化使用
直流交換機
直連交換機Direct Exchange(完全匹配路由key)
所有發(fā)送到Direct Exchange的消息會被轉(zhuǎn)發(fā)到RouteKey中指定的Queue
注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進行任何綁定(binding)操作,消息傳遞時,RouteKey必須完全匹配才會被隊列接收,否則該消息會被拋棄;
消費端代碼
package com.xieminglu.rabbitmqapi.exchange.direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示聲明了一個交換機 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示聲明了一個隊列 channel.queueDeclare(queueName, false, false, false, null); //建立一個綁定關(guān)系: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數(shù):隊列名稱、是否自動ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環(huán)獲取消息 while(true){ //獲取消息,如果沒有消息,這一步將會一直阻塞 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
生產(chǎn)端代碼
package com.xieminglu.rabbitmqapi.exchange.direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author 小李飛刀 * @site www.javaxl.com * @company * @create 2019-11-18 10:22 */ public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1 創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 創(chuàng)建Connection Connection connection = connectionFactory.newConnection(); //3 創(chuàng)建Channel Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; // String routingKey = "test.direct111"; //收不到 //5 發(fā)送 String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... "; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } }
代碼的區(qū)別: 一條消息只會發(fā)送在一個隊列里
創(chuàng)建一個交換機與隊列
所綁定的交換機
控制臺輸出
主題交換機
主題交換機Topic Exchange(匹配路由規(guī)則的交換機)
所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)系RouteKey中指定Topic的Queue上;
Exchange將RouteKey和某Topic進行模糊匹配,此時隊列需要綁定一個Topic;
注意:可以使用通配符進行模糊匹配
- 符號:“#” 匹配一個或者多個詞
- 符號:“” 匹配不多不少一個詞
列如:
- “log.#” 能夠匹配到 “log.info.oa”
- “log.” 能夠匹配到 “log.err”
消費端代碼
package com.xieminglu.rabbitmqapi.exchange.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer4TopicExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; String routingKey = "user.#"; // String routingKey = "user.*"; // 1 聲明交換機 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 2 聲明隊列 channel.queueDeclare(queueName, false, false, false, null); // 3 建立交換機和隊列的綁定關(guān)系: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數(shù):隊列名稱、是否自動ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環(huán)獲取消息 while(true){ //獲取消息,如果沒有消息,這一步將會一直阻塞 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
生產(chǎn)端代碼
package com.xieminglu.rabbitmqapi.exchange.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer4TopicExchange { public static void main(String[] args) throws Exception { //1 創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 創(chuàng)建Connection Connection connection = connectionFactory.newConnection(); //3 創(chuàng)建Channel Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //5 發(fā)送 String msg = "Hello World RabbitMQ 4 Topic Exchange Message ..."; channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes()); channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); channel.close(); connection.close(); } }
代碼的區(qū)別: 一條消息會發(fā)送在多個隊列里 消費端:
生產(chǎn)端:
控制臺輸出
并且可以同時綁定多個交換機
輸出交換機
輸出交換機Fanout Exchange(不做路由)
- 不處理路由鍵,只需要簡單的將隊列綁定到交換機上;
- 發(fā)送到交換機的消息都會被轉(zhuǎn)發(fā)到與該交換機綁定的所有隊列上;
- Fanout交換機轉(zhuǎn)發(fā)消息是最快的
消費端代碼
package com.xieminglu.rabbitmqapi.exchange.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer4FanoutExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_fanout_queue"; String routingKey = ""; //不設(shè)置路由鍵 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //參數(shù):隊列名稱、是否自動ACK、Consumer channel.basicConsume(queueName, true, consumer); //循環(huán)獲取消息 while(true){ //獲取消息,如果沒有消息,這一步將會一直阻塞 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
生產(chǎn)端代碼
package com.xieminglu.rabbitmqapi.exchange.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer4FanoutExchange { public static void main(String[] args) throws Exception { //1 創(chuàng)建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 創(chuàng)建Connection Connection connection = connectionFactory.newConnection(); //3 創(chuàng)建Channel Channel channel = connection.createChannel(); //4 聲明 String exchangeName = "test_fanout_exchange"; //5 發(fā)送 for(int i = 0; i < 10; i ++) { String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ..."; channel.basicPublish(exchangeName, "", null , msg.getBytes()); } channel.close(); connection.close(); } }
消費端:
生產(chǎn)端:
控制臺輸出
Binding-綁定
- Exchange和Exchange、Queue之間的連接關(guān)系;
- Binding中可以包含RoutingKey或者參數(shù)
Queue-消息隊列
- 消息隊列,實際存儲消息數(shù)據(jù)
- Durability:是否持久化
- Durable:是,Transient:否
- Auto delete:如選yes,代表當(dāng)最后一個監(jiān)聽被移除之后,該Queue會自動被刪除
Message-消息
- 服務(wù)器和應(yīng)用程序之間傳遞的數(shù)據(jù)
- 本質(zhì)上就是一段數(shù)據(jù),由Properties和Payload(Body)組成
- 常用屬性:delivery model、headers(自定義屬性)
Message-其他屬性
- content_type、content_encoding、priority
- correlation_id、reply_to、expiration、message_id
- Timestamp、type、user_id、app_id、cluster_id
Virtual host-虛擬主機
- 虛擬地址,用于進行邏輯隔離,最上層的消息路由
- 一個Virtual Host里面可以有若干個Exchange和Queue
- 同一個Virtual Host里面不能有相同名稱的Exchange或Queue
總結(jié)一下
交換機的概念
在沒有交換機的時候,我們的消息隊列會處理所有發(fā)給這個消息隊列的消息,然后由消費者一個一個消費這個隊列里面的消息,如果由集群的話還會分?jǐn)倢@個消息隊列的處理。只不過這里面有一個
Message acknowledgment的概念
這將會導(dǎo)致嚴(yán)重的bug——Queue中堆積的消息會越來越多
當(dāng)然一般的消息中間件都不會這么干,我們使用了交換機后,我們看到我們的三種策略,其實都可以說由交換機去找跟它所綁定的消息隊列,如果生產(chǎn)端的路由鍵不符合要求或找不到消息隊列定好的路由鍵的話就會進行其他處理。
到此這篇關(guān)于Java中RabbitMQ消息隊列的交換機詳解的文章就介紹到這了,更多相關(guān)RabbitMQ交換機內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringMVC配置多個properties文件之通配符解析
這篇文章主要介紹了SpringMVC配置多個properties文件之通配符解析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09java實現(xiàn)socket從服務(wù)器連續(xù)獲取消息的示例
這篇文章主要介紹了java實現(xiàn)socket從服務(wù)器連續(xù)獲取消息的示例,需要的朋友可以參考下2014-04-04Struts1簡介和入門_動力節(jié)點Java學(xué)院整理
這篇文章主要為大家詳細介紹了Struts1簡介和入門的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-09-09Intellj?idea新建的java源文件夾不是藍色的圖文解決辦法
idea打開java項目后新建的模塊中,java文件夾需要變成藍色,這篇文章主要給大家介紹了關(guān)于Intellj?idea新建的java源文件夾不是藍色的相關(guān)資料,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2024-02-02mybatis使用foreach查詢不出結(jié)果也不報錯的問題
這篇文章主要介紹了mybatis使用foreach查詢不出結(jié)果也不報錯的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03解決BigDecimal轉(zhuǎn)long丟失精度的問題
這篇文章主要介紹了解決BigDecimal轉(zhuǎn)long丟失精度的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12Spring Boot啟動過程(六)之內(nèi)嵌Tomcat中StandardHost、StandardContext和Sta
這篇文章主要介紹了Spring Boot啟動過程(六)之內(nèi)嵌Tomcat中StandardHost、StandardContext和StandardWrapper的啟動教程詳解,需要的朋友可以參考下2017-04-04解決idea update project 更新選項消失的問題
這篇文章主要介紹了解決idea update project 更新選項消失的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01