docker啟動rabbitmq以及使用方式詳解
搜索rabbitmq鏡像
docker search rabbitmq:management
下載鏡像
docker pull rabbitmq:management
啟動容器
docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management
打印容器
docker logs rabbitmq
訪問RabbitMQ Management
http://localhost:15672
賬戶密碼默認:guest
編寫生產(chǎn)者類
package com.xun.rabbitmqdemo.example; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 生成一個queue隊列 * 1、隊列名稱 QUEUE_NAME * 2、隊列里面的消息是否持久化(默認消息存儲在內(nèi)存中) * 3、該隊列是否只供一個Consumer消費 是否共享 設置為true可以多個消費者消費 * 4、是否自動刪除 最后一個消費者斷開連接后 該隊列是否自動刪除 * 5、其他參數(shù) */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message = "Hello world!"; /** * 發(fā)送一個消息 * 1、發(fā)送到哪個exchange交換機 * 2、路由的key * 3、其他的參數(shù)信息 * 4、消息體 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println(" [x] Sent '"+message+"'"); channel.close(); connection.close(); } }
運行該方法,可以看到控制臺的打印
name=hello的隊列收到Message
消費者
package com.xun.rabbitmqdemo.example; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setConnectionTimeout(600000);//milliseconds factory.setRequestedHeartbeat(60);//seconds factory.setHandshakeTimeout(6000);//milliseconds factory.setRequestedChannelMax(5); factory.setNetworkRecoveryInterval(500); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); System.out.println("Waiting for messages. "); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
工作隊列
RabbitMqUtils工具類
package com.xun.rabbitmqdemo.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMqUtils { public static Channel getChannel() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
啟動2個工作線程
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; public class Work01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String receivedMessage = new String(delivery.getBody()); System.out.println("接收消息:"+receivedMessage); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消費者取消消費接口回調(diào)邏輯"); }; System.out.println("C1 消費者啟動等待消費...."); /** * 消費者消費消息 * 1、消費哪個隊列 * 2、消費成功后是否自動應答 * 3、消費的接口回調(diào) * 4、消費未成功的接口回調(diào) */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; public class Work02 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String receivedMessage = new String(delivery.getBody()); System.out.println("接收消息:"+receivedMessage); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消費者取消消費接口回調(diào)邏輯"); }; System.out.println("C2 消費者啟動等待消費...."); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
啟動工作線程
啟動發(fā)送線程
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.Channel; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import java.util.Scanner; public class Task01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ try(Channel channel= RabbitMqUtils.getChannel();){ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //從控制臺接收消息 Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("發(fā)送消息完成:"+message); } } } }
啟動發(fā)送線程,此時發(fā)送線程等待鍵盤輸入
發(fā)送4個消息
可以看到2個工作線程按照順序分別接收message。
消息應答機制
rabbitmq將message發(fā)送給消費者后,就會將該消息標記為刪除。
但消費者在處理message過程中宕機,會導致消息的丟失。
因此需要設置手動應答。
生產(chǎn)者
import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import java.util.Scanner; public class Task02 { private static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception{ try(Channel channel = RabbitMqUtils.getChannel()){ channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null); Scanner scanner = new Scanner(System.in); System.out.println("請輸入信息"); while(scanner.hasNext()){ String message = scanner.nextLine(); channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes()); System.out.println("生產(chǎn)者task02發(fā)出消息"+ message); } } } }
消費者
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import com.xun.rabbitmqdemo.utils.SleepUtils; public class Work03 { private static final String ACK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); System.out.println("Work03 等待接收消息處理時間較短"); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody()); SleepUtils.sleep(1); System.out.println("接收到消息:"+message); /** * 1、消息的標記tag * 2、是否批量應答 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消費者取消消費接口回調(diào)邏輯"); }; //采用手動應答 boolean autoAck = false; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import com.xun.rabbitmqdemo.utils.SleepUtils; public class Work04 { private static final String ACK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); System.out.println("Work04 等待接收消息處理時間較長"); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody()); SleepUtils.sleep(30); System.out.println("接收到消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消費者取消消費接口回調(diào)邏輯"); }; //采用手動應答 boolean autoAck = false; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }
工具類SleepUtils
package com.xun.rabbitmqdemo.utils; public class SleepUtils { public static void sleep(int second){ try{ Thread.sleep(1000*second); }catch (InterruptedException _ignored){ Thread.currentThread().interrupt(); } } }
模擬
work04等待30s后發(fā)出ack
在work04處理message時手動停止線程,可以看到message:dd被rabbitmq交給了work03
不公平分發(fā)
上面的輪詢分發(fā),生產(chǎn)者依次向消費者按順序發(fā)送消息,但當消費者A處理速度很快,而消費者B處理速度很慢時,這種分發(fā)策略顯然是不合理的。
不公平分發(fā):
int prefetchCount = 1; channel.basicQos(prefetchCount);
通過此配置,當消費者未處理完當前消息,rabbitmq會優(yōu)先將該message分發(fā)給空閑消費者。
總結(jié)
到此這篇關于docker啟動rabbitmq以及使用的文章就介紹到這了,更多相關docker啟動rabbitmq及使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
docker-compose啟動docker文件掛載失敗的解決
這篇文章主要介紹了docker-compose啟動docker文件掛載失敗的解決方案。具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03DockerCE之執(zhí)行docker info出現(xiàn)兩條警告信息及解決方案
這篇文章主要介紹了DockerCE之執(zhí)行docker info出現(xiàn)兩條警告信息及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02解決docker中mysql時間與系統(tǒng)時間不一致問題
最近在Docker中裝mysql時,發(fā)現(xiàn)數(shù)據(jù)庫時間與系統(tǒng)時間相差8個小時。查詢資料發(fā)現(xiàn),docker的默認時區(qū)是0區(qū),其實這會對安裝的容器造成不少麻煩,比如執(zhí)行日志的記錄不準確等2021-12-12docker網(wǎng)絡,docker-compose?network問題
這篇文章主要介紹了docker網(wǎng)絡,docker-compose?network問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01docker環(huán)境搭建JMeter+Grafana+influxdb可視化性能監(jiān)控平臺的教程
這篇文章主要介紹了docker下搭建JMeter+Grafana+influxdb可視化性能監(jiān)控平臺,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-07-07docker使用Dockerfile構建鏡像的實現(xiàn)示例
本文主要介紹了docker使用Dockerfile構建鏡像的實現(xiàn)示例,通過編寫 Dockerfile,您可以定義鏡像的基礎環(huán)境、安裝軟件包、復制文件、設置環(huán)境變量等操作,下面就來介紹一下2024-01-01Docker網(wǎng)絡配置及部署SpringCloud項目詳解
bridge模式是Docker默認的網(wǎng)絡設置,此模式會為每一個容器分配Network Namespace、設置IP等,并將一個主機上的Docker容器連接到一個虛擬網(wǎng)橋上,下面這篇文章主要給大家介紹了關于Docker網(wǎng)絡配置及部署SpringCloud項目的相關資料,需要的朋友可以參考下2023-01-01