原生Java操作兔子隊(duì)列RabbitMQ
一.前言
RabbitMQ 是一種快速、靈活、可靠的消息傳遞方式,可用于構(gòu)建分布式應(yīng)用程序、異步處理任務(wù)、實(shí)現(xiàn)消息隊(duì)列等。下面是 Java 原生操作 RabbitMQ 的一些好處和用途:
- 簡(jiǎn)單易用:RabbitMQ 提供了豐富的 Java 客戶(hù)端庫(kù),開(kāi)發(fā)者可以輕松地使用 Java 代碼進(jìn)行消息發(fā)送和接收,無(wú)需學(xué)習(xí)復(fù)雜的消息傳遞協(xié)議和 API。
- 可擴(kuò)展性強(qiáng):RabbitMQ 支持集群和分布式部署,可以輕松地實(shí)現(xiàn)橫向和縱向擴(kuò)展,以適應(yīng)不同規(guī)模和負(fù)載的應(yīng)用需求。
- 可靠性高:RabbitMQ 提供了多種消息傳遞模式,包括持久化消息、確認(rèn)機(jī)制、事務(wù)機(jī)制等,確保消息傳遞的可靠性和一致性。
- 異步處理能力:RabbitMQ 可以異步處理任務(wù),提高應(yīng)用程序的響應(yīng)速度和吞吐量,實(shí)現(xiàn)任務(wù)削峰、應(yīng)對(duì)高并發(fā)等需求。
- 可用于多種場(chǎng)景:RabbitMQ 可以用于構(gòu)建分布式應(yīng)用程序、實(shí)現(xiàn)消息隊(duì)列、異步處理任務(wù)、實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)同步等場(chǎng)景,具有廣泛的應(yīng)用場(chǎng)景和發(fā)展前景。
二.原生Java操作RabbitMQ
Ⅰ. 簡(jiǎn)單模式
1. 添加依賴(lài)
<!--rabbitmq依賴(lài)--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.0</version> </dependency>
2. 編寫(xiě)生產(chǎn)者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; //生產(chǎn)者 public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.建立信道 Channel channel = connection.createChannel(); // 4.創(chuàng)建隊(duì)列,若隊(duì)列已存在則使用該隊(duì)列 /** * 參數(shù)1:隊(duì)列名 * 參數(shù)2:是否持久化,true表示MQ重啟后隊(duì)列還存在 * 參數(shù)3:是否私有化,false表示所有消費(fèi)者都可以訪(fǎng)問(wèn),true表示只有第一次訪(fǎng)問(wèn)她的消費(fèi)者才能訪(fǎng)問(wèn) * 參數(shù)4:是否自動(dòng)刪除,true表示不再使用隊(duì)列時(shí),自動(dòng)刪除 * 參數(shù)5:其他額外參數(shù) */ channel.queueDeclare("simple_queue",false,false,false,null); // 5.發(fā)送消息 String message = "hello rabbitmq"; /** * 參數(shù)1:交換機(jī)名,""表示默認(rèn)交換機(jī) * 參數(shù)2:路由鍵,簡(jiǎn)單模式就是隊(duì)列名 * 參數(shù)3:其他額外參數(shù) * 參數(shù)4:要傳遞的消息字節(jié)數(shù)組 */ channel.basicPublish("","simple_queue",null,message.getBytes()); // 6.關(guān)閉信道和連接 channel.close(); connection.close(); System.out.println("=====發(fā)送成功===="); } }
3. 編寫(xiě)消費(fèi)者
因?yàn)橄M(fèi)者不知道生產(chǎn)者什么時(shí)候發(fā)送消息過(guò)來(lái),所以消費(fèi)者需要一直監(jiān)聽(tīng)生產(chǎn)者
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 消費(fèi)者 */ public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽(tīng)隊(duì)列 /** * 參數(shù)1:監(jiān)聽(tīng)的隊(duì)列名 * 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。 * 參數(shù)3:Consumer的實(shí)現(xiàn)類(lèi),重寫(xiě)該類(lèi)方法表示接收到這個(gè)消息之后該如何消費(fèi)消息 */ channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息,消息為:"+message); } }); } }
Ⅱ. 工作隊(duì)列模式
與簡(jiǎn)單模式相比,工作隊(duì)列模式(Work Queue)多了一些消費(fèi)者,該模式也使用direct交換機(jī),應(yīng)用于處理消息較多的情況。特點(diǎn)如下:
- 一個(gè)隊(duì)列對(duì)應(yīng)多個(gè)消費(fèi)者。
- 一條消息只會(huì)被一個(gè)消費(fèi)者消費(fèi)。
- 消息隊(duì)列默認(rèn)采用輪詢(xún)的方式將消息平均發(fā)送給消費(fèi)者。
其實(shí)就是 簡(jiǎn)單模式plus版本。
1. 編寫(xiě)生產(chǎn)者
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.創(chuàng)建隊(duì)列,持久化隊(duì)列 channel.queueDeclare("work_queue",true,false,false,null); // 5.發(fā)送大量消息,參數(shù)3表示該消息為持久化消息,即除了保存到內(nèi)存還保存到磁盤(pán) for (int i = 0; i < 100; i++) { channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("您好,這是今天的第"+i+"條消息").getBytes(StandardCharsets.UTF_8)); } // 6.關(guān)閉資源 channel.close(); connection.close(); } }
2. 編寫(xiě)消費(fèi)者
這里使用創(chuàng)建了三個(gè)消費(fèi)者,來(lái)接收生產(chǎn)者的消息
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 監(jiān)聽(tīng)隊(duì)列,處理消息 channel.basicConsume("work_queue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("消費(fèi)者1消費(fèi)消息,消息為:"+message); } }); } }
3. 實(shí)現(xiàn)
先把三個(gè)消費(fèi)者運(yùn)行起來(lái),再運(yùn)行生產(chǎn)者,得到的消息就會(huì)輪詢(xún)均分
Ⅲ. 發(fā)布訂閱模式
在開(kāi)發(fā)過(guò)程中,有一些消息需要不同消費(fèi)者進(jìn)行不同的處理,如電商網(wǎng)站的同一條促銷(xiāo)信息需要短信發(fā)送、郵件發(fā)送、站內(nèi)信發(fā)送等。此時(shí)可以使用發(fā)布訂閱模式(Publish/Subscribe)
特點(diǎn):
- 生產(chǎn)者將消息發(fā)送給交換機(jī),交換機(jī)將消息轉(zhuǎn)發(fā)到綁定此交換機(jī)的每個(gè)隊(duì)列中。
- 工作隊(duì)列模式的交換機(jī)只能將消息發(fā)送給一個(gè)隊(duì)列,發(fā)布訂閱模式的交換機(jī)能將消息發(fā)送給多個(gè)隊(duì)列。發(fā)布訂閱模式使用fanout交換機(jī)。
1. 編寫(xiě)生產(chǎn)者
這里創(chuàng)建了三條隊(duì)列,一條是發(fā)送短信,一條是站內(nèi)信,一條是郵件隊(duì)列、
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import javax.swing.plaf.TreeUI; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * 發(fā)布訂閱者模式跟簡(jiǎn)單和工作模式不一樣,不是使用默認(rèn)的交換機(jī),而是自己創(chuàng)建fanout交換機(jī),生產(chǎn)者把消息發(fā)到交換機(jī),由交換機(jī)轉(zhuǎn)發(fā)到與之綁定的隊(duì)列 */ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.建立信道 Channel channel = connection.createChannel(); // 4.創(chuàng)建交換機(jī) /** * 參數(shù)1:交換機(jī)名 * 參數(shù)2:交換機(jī)類(lèi)型 * 參數(shù)3:交換機(jī)是否持久化 */ channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true); // 5.創(chuàng)建隊(duì)列 channel.queueDeclare("SEND_MAIL",true,false,false,null); channel.queueDeclare("SEND_MESSAGE",true,false,false,null); channel.queueDeclare("SEND_STATION",true,false,false,null); // 6.交換機(jī)綁定隊(duì)列 /** * 參數(shù)1:隊(duì)列名 * 參數(shù)2:交換機(jī)名 * 參數(shù)3:路由關(guān)鍵字,發(fā)布訂閱模式只需要寫(xiě)""即可 */ channel.queueBind("SEND_MAIL","exchange_fanout",""); channel.queueBind("SEND_MESSAGE","exchange_fanout",""); channel.queueBind("SEND_STATION","exchange_fanout",""); // 7.發(fā)送消息 for (int i = 0; i < 10; i++) { channel.basicPublish("exchange_fanout","",null, ("您好,尊敬的用戶(hù),秒殺商品活動(dòng)開(kāi)始啦:"+i).getBytes(StandardCharsets.UTF_8)); } // 8.關(guān)閉資源 channel.close(); connection.close(); } }
2. 編寫(xiě)消費(fèi)者
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //短信消費(fèi)者 public class ConsumerMessage { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽(tīng)隊(duì)列 /** * 參數(shù)1:監(jiān)聽(tīng)的隊(duì)列名 * 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。 * 參數(shù)3:Consumer的實(shí)現(xiàn)類(lèi),重寫(xiě)該類(lèi)方法表示接收到這個(gè)消息之后該如何消費(fèi)消息 */ channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送短信,消息為:"+message); } }); } }
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //郵件消費(fèi)者 public class ConsumerMail { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽(tīng)隊(duì)列 /** * 參數(shù)1:監(jiān)聽(tīng)的隊(duì)列名 * 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。 * 參數(shù)3:Consumer的實(shí)現(xiàn)類(lèi),重寫(xiě)該類(lèi)方法表示接收到這個(gè)消息之后該如何消費(fèi)消息 */ channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送郵件,消息為:"+message); } }); } }
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //站內(nèi)信消費(fèi)者 public class ConsumerStation { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽(tīng)隊(duì)列 /** * 參數(shù)1:監(jiān)聽(tīng)的隊(duì)列名 * 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。 * 參數(shù)3:Consumer的實(shí)現(xiàn)類(lèi),重寫(xiě)該類(lèi)方法表示接收到這個(gè)消息之后該如何消費(fèi)消息 */ channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送站內(nèi)信,消息為:"+message); } }); } }
發(fā)布訂閱模式也允許多個(gè)消費(fèi)者監(jiān)聽(tīng)同一個(gè)隊(duì)列(工作模式),例如 兩個(gè)發(fā)送短信消費(fèi)者監(jiān)聽(tīng)同一個(gè)短信生產(chǎn)者,這樣短信生產(chǎn)者的消息將會(huì)被輪詢(xún)平分。
Ⅳ. 路由模式
使用發(fā)布訂閱模式時(shí),所有消息都會(huì)發(fā)送到綁定的隊(duì)列中,但很多時(shí)候,不是所有消息都無(wú)差別的發(fā)布到所有隊(duì)列中。比如電商網(wǎng)站
的促銷(xiāo)活動(dòng),雙十一大促可能會(huì)發(fā)布到所有隊(duì)列;而一些小的促銷(xiāo)活動(dòng)為了節(jié)約成本,只發(fā)布到站內(nèi)信隊(duì)列。此時(shí)需要使用路由模式
(Routing)完成這一需求。意思就是只發(fā)給與綁定相同路由關(guān)鍵字的隊(duì)列。
特點(diǎn):
- 每個(gè)隊(duì)列綁定路由關(guān)鍵字RoutingKey。
- 生產(chǎn)者將帶有RoutingKey的消息發(fā)送給交換機(jī),交換機(jī)根據(jù)RoutingKey轉(zhuǎn)發(fā)到指定隊(duì)列。路由模式使用direct交換機(jī)。
1. 編寫(xiě)生產(chǎn)者
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * 發(fā)布訂閱者模式跟簡(jiǎn)單和工作模式不一樣,不是使用默認(rèn)的交換機(jī),而是自己創(chuàng)建fanout交換機(jī),生產(chǎn)者把消息發(fā)到交換機(jī),由交換機(jī)轉(zhuǎn)發(fā)到與之綁定的隊(duì)列 */ // 生產(chǎn)者 public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.建立信道 Channel channel = connection.createChannel(); // 4.創(chuàng)建交換機(jī) channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true); // 5.創(chuàng)建隊(duì)列 channel.queueDeclare("SEND_MAIL2",true,false,false,null); channel.queueDeclare("SEND_MESSAGE2",true,false,false,null); channel.queueDeclare("SEND_STATION2",true,false,false,null); // 6.交換機(jī)綁定隊(duì)列 channel.queueBind("SEND_MAIL2","exchange_routing","import"); channel.queueBind("SEND_MESSAGE2","exchange_routing","import"); channel.queueBind("SEND_STATION2","exchange_routing","import"); channel.queueBind("SEND_STATION2","exchange_routing","normal"); // 7.發(fā)送消息 channel.basicPublish("exchange_routing","import",null, "雙十一大促活動(dòng)".getBytes()); channel.basicPublish("exchange_routing","normal",null, "小心促銷(xiāo)活動(dòng)".getBytes()); // 8.關(guān)閉資源 channel.close(); connection.close(); } }
2. 編寫(xiě)消費(fèi)者
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //發(fā)送郵件消費(fèi)者 public class ConsumerMail { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽(tīng)隊(duì)列 /** * 參數(shù)1:監(jiān)聽(tīng)的隊(duì)列名 * 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。 * 參數(shù)3:Consumer的實(shí)現(xiàn)類(lèi),重寫(xiě)該類(lèi)方法表示接收到這個(gè)消息之后該如何消費(fèi)消息 */ channel.basicConsume("SEND_MAIL2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送郵件,消息為:"+message); } }); } }
//發(fā)送信息消費(fèi)者 public class ConsumerMessage { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽(tīng)隊(duì)列 /** * 參數(shù)1:監(jiān)聽(tīng)的隊(duì)列名 * 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。 * 參數(shù)3:Consumer的實(shí)現(xiàn)類(lèi),重寫(xiě)該類(lèi)方法表示接收到這個(gè)消息之后該如何消費(fèi)消息 */ channel.basicConsume("SEND_MESSAGE2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送短信,消息為:"+message); } }); } }
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; //站內(nèi)信消費(fèi)者 public class ConsumerStation { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.創(chuàng)建信道 Channel channel = connection.createChannel(); // 4.監(jiān)聽(tīng)隊(duì)列 /** * 參數(shù)1:監(jiān)聽(tīng)的隊(duì)列名 * 參數(shù)2:是否自動(dòng)簽收,如果為false,則需要手動(dòng)確認(rèn)消息已收到,否則MQ會(huì)一直發(fā)送消息。 * 參數(shù)3:Consumer的實(shí)現(xiàn)類(lèi),重寫(xiě)該類(lèi)方法表示接收到這個(gè)消息之后該如何消費(fèi)消息 */ channel.basicConsume("SEND_STATION2",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("發(fā)送站內(nèi)信,消息為:"+message); } }); } }
Ⅴ. 通配符模式
通配符模式(Topic)是在路由模式的基礎(chǔ)上,給隊(duì)列綁定帶通配符的路由關(guān)鍵字,只要消息的RoutingKey能實(shí)現(xiàn)通配符匹配,就會(huì)將消
息轉(zhuǎn)發(fā)到該隊(duì)列。通配符模式比路由模式更靈活,使用topic交換機(jī)。
通配符規(guī)則:
1 消息設(shè)置RoutingKey時(shí),RoutingKey由多個(gè)單詞構(gòu)成,中間以 . 分割。
2 隊(duì)列設(shè)置RoutingKey時(shí), # 可以匹配任意多個(gè)單詞, * 可以匹配任意一個(gè)單詞。
1. 編寫(xiě)生產(chǎn)者
package com.itbz.mq.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 發(fā)布訂閱者模式跟簡(jiǎn)單和工作模式不一樣,不是使用默認(rèn)的交換機(jī),而是自己創(chuàng)建fanout交換機(jī),生產(chǎn)者把消息發(fā)到交換機(jī),由交換機(jī)轉(zhuǎn)發(fā)到與之綁定的隊(duì)列 */ // 生產(chǎn)者 public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1.創(chuàng)建連接工廠(chǎng) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("120.79.50.65"); connectionFactory.setPort(5672); connectionFactory.setUsername("lion"); connectionFactory.setPassword("lion"); connectionFactory.setVirtualHost("/"); // 2.創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); // 3.建立信道 Channel channel = connection.createChannel(); // 4.創(chuàng)建交換機(jī) channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true); // 5.創(chuàng)建隊(duì)列 channel.queueDeclare("SEND_MAIL3",true,false,false,null); channel.queueDeclare("SEND_MESSAGE3",true,false,false,null); channel.queueDeclare("SEND_STATION3",true,false,false,null); // 6.交換機(jī)綁定隊(duì)列 channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#"); channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#"); channel.queueBind("SEND_STATION3","exchange_topic","#.station.#"); // 7.發(fā)送消息 // 三個(gè)隊(duì)列都匹配上了 channel.basicPublish("exchange_topic","mail.message.station",null, "雙十一大促活動(dòng)".getBytes()); // 只發(fā)給station channel.basicPublish("exchange_topic","station",null, "小心促銷(xiāo)活動(dòng)".getBytes()); // 8.關(guān)閉資源 channel.close(); connection.close(); } }
2. 編寫(xiě)消費(fèi)者
跟前面差不多
三.總結(jié)
這篇萬(wàn)字長(zhǎng)文總結(jié)了原生Java操作RabbitMQ的各種過(guò)程,希望對(duì)您有幫助哦!
到此這篇關(guān)于原生Java操作兔子隊(duì)列RabbitMQ的文章就介紹到這了,更多相關(guān)原生Java操作RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Netty分布式ByteBuf中PooledByteBufAllocator剖析
這篇文章主要為大家介紹了Netty分布式ByteBuf剖析PooledByteBufAllocator簡(jiǎn)述,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03JsonFormat與@DateTimeFormat注解實(shí)例解析
這篇文章主要介紹了JsonFormat與@DateTimeFormat注解實(shí)例解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12SpringCloud修改Feign日志記錄級(jí)別過(guò)程淺析
OpenFeign源于Netflix的Feign,是http通信的客戶(hù)端。屏蔽了網(wǎng)絡(luò)通信的細(xì)節(jié),直接面向接口的方式開(kāi)發(fā),讓開(kāi)發(fā)者感知不到網(wǎng)絡(luò)通信細(xì)節(jié)。所有遠(yuǎn)程調(diào)用,都像調(diào)用本地方法一樣完成2023-02-02解決fastjson泛型轉(zhuǎn)換報(bào)錯(cuò)的解決方法
這篇文章主要介紹了解決fastjson泛型轉(zhuǎn)換報(bào)錯(cuò)的解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11談?wù)凧ava利用原始HttpURLConnection發(fā)送POST數(shù)據(jù)
這篇文章主要給大家介紹java利用原始httpUrlConnection發(fā)送post數(shù)據(jù),設(shè)計(jì)到httpUrlConnection類(lèi)的相關(guān)知識(shí),感興趣的朋友跟著小編一起學(xué)習(xí)吧2015-10-10全網(wǎng)最新Log4j?漏洞修復(fù)和臨時(shí)補(bǔ)救方法
Apache?Log4j?遠(yuǎn)程代碼執(zhí)行漏洞,如何快速修復(fù)log4j2漏洞,本文給大家介紹下Log4j?漏洞修復(fù)和臨時(shí)補(bǔ)救方法,感興趣的朋友跟隨小編一起看看吧2021-12-12使用Feign調(diào)用時(shí)添加驗(yàn)證信息token到請(qǐng)求頭方式
這篇文章主要介紹了使用Feign調(diào)用時(shí)添加驗(yàn)證信息token到請(qǐng)求頭方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03