Java消息隊列RabbitMQ入門詳解
主流中間件對比
ActiveMQ 是 Apache 出品,最流行的,能力強勁的開源消息總線,并且它一 個完全支持 J M S 規(guī)范的消息中間件。
其豐富的 API 、多種集群構(gòu)建模式使得他成為業(yè)界老牌消息中間件,在中 小型企業(yè)中應(yīng)用廣泛!
MQ 衡量指標(biāo):服務(wù)性能、數(shù)據(jù)存儲、集群架構(gòu)
Kafka
RocketMQ是阿里開源的消息中間件,目前也已經(jīng)孵化為Apache頂級項目, 它是純java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng) 應(yīng)用的特點。
RocketMQ思路起源于Kafka,它對消息的可靠傳輸及事務(wù) 性做了優(yōu)化, 目前在阿里集團被廣泛應(yīng)用于交易、充值、流計算、消息推 送、日志流式處理、binglog分發(fā)等場景
RabbitMQ是使用Erlang語言開發(fā)的開源消息隊列系統(tǒng),基于AMQP協(xié)議 來實現(xiàn)。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發(fā)布 /訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi), 對數(shù)據(jù)_致 性、穩(wěn)定性和可靠性要求很髙的場景,對性能和吞吐量的要求還在其次。
結(jié)論:
- activiMq老牌消息中間件,api全面,但是吞吐量不大
- Kafaka吞吐量大,但是數(shù)據(jù)無法保證不丟失,主要面向大數(shù)據(jù)
- rokectMQ:吞吐量大,保證數(shù)據(jù)不丟失,并且支持分布式事物,但是商業(yè)版需要收費
- rabbitMQ:吞吐量大,數(shù)據(jù)不易丟失
初識RabbitMQ
RabbitMQ是—個開源的消息代理和隊列服務(wù)器,用來通過普通協(xié)議 在完全不同的應(yīng)用之間共享數(shù)據(jù),RabbitMQ是使用Erlang語言來編寫 的,并且RabbitMQ是基于AMQP協(xié)議的。
哪些大廠在用RabbitMQ,為什幺?
- 滴滴、美團、頭條、去哪兒、藝龍…
- 開源、性能優(yōu)秀,穩(wěn)定性保障
- 提供可靠性消息投遞模式(confirm)、返回模式(return )
- 與SpringAMQP完美的整合、API豐富
- 集群模式豐富,表達式配置,HA模式,鏡像隊列模型
- 保證數(shù)據(jù)不丟失的前提做到高可靠性、可用性
RabbitMQ高性能的原因?
Erlang語言最初在于交換機領(lǐng)域的架構(gòu)模式,這樣使得 RabbitMQ在Broker之間進行數(shù)據(jù)交互的性能是非常優(yōu)秀的
Erlang的優(yōu)點:Erlang有著和原生Socket—樣的延遲
什么是AMQP高級消息隊列協(xié)議?
- AMQP定義: 是具有現(xiàn)代特征的二進制協(xié)議;
- 是一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準高級消息隊列協(xié)議;
- 是應(yīng)用層協(xié)議的一個開放標(biāo)準,為面向消息的中間件設(shè)計;
AMQP核心概念(重點)
- Server:又稱Broker,接受客戶端的連接,實現(xiàn)AMQP實體服務(wù)
- Connection:連接:應(yīng)用程序與Broker的網(wǎng)絡(luò)連接
- Channel:網(wǎng)絡(luò)通道,幾乎所有的操作都在Channel中進行,Channel是進行消息讀寫的通道;客戶端可建立多個Channel,每個Channel代表一個會話任務(wù);
- Message:消息,服務(wù)器與應(yīng)用程序之間傳遞的數(shù)據(jù),由Properties和Body組成。Properties可以對消息進行裝飾,比如消息的優(yōu)先級、延遲等高級特性;Body則就是消息體內(nèi)容;
- Virtual host:虛擬地址,用于進行邏輯隔離,最上層的消息路由;一個Virtual Host里面可以有若干個Exchange和Queue,同一個Virtual Host里面不能有相同名稱的Exchange或Queue;
- Exchange:交換機,交換消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊列;
- Binding:Exchange和Queue之間的虛擬連接,binding中可以包含routing key;
- Routing key:一個路由規(guī)則,虛擬機可用它來確定如何路由一個特定消息
- Queue:也稱為Message Queue,消息隊列,保存消息并將它們轉(zhuǎn)發(fā)給消費者
RabbitMQ安裝及使用
Centos安裝方式
注意:Erlang語言與RabbitMQ安裝版本必須匹配
RabbitMQ官網(wǎng)安裝
官網(wǎng)地址:https://www.rabbitmq.com/
提前準備:安裝Linux必要依賴包
下載RabbitMQ必須安裝包
配置文件修改
服務(wù)的啟動:rabbitmq-server start &
服務(wù)的停止:rabbitmqctl stop_app
管理插件:rabbitmq-plugins enable rabbitmq_management
訪問地址://ip:15672/
詳細步驟
- 準備:
yum install \ build-essential openssl openssl-devel unixODBC unixODBC-devel \ make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz -y
- 下載:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
- 安裝:
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --nodeps --force rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
- 配置文件:
vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
- 比如修改密碼、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
- 服務(wù)啟動
rabbitmq-server start &
- 服務(wù)停止
rabbitmqctl app_stop
- 查看服務(wù)是否成功:
yum install lsof lsof -i:5672
- 管理插件:
rabbitmq-plugins enable rabbitmq_management
- 訪問地址:
http://192.168.147.146:15672/
Docker安裝方式
注意獲取鏡像的時候要獲取management版本的,不要獲取last版本的,management版本的才帶有管理界面
1.查詢鏡像
docker search rabbitmq:management
2.獲取鏡像
docker pull rabbitmq:management
3.運行鏡像
- 方式一:
默認guest用戶,密碼也是guest docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
- 方式二:
設(shè)置用戶名和密碼
docker run -d –name my-rabbitmq -p 5672:5672 -p 15672:15672 -v /data:/var/lib/rabbitmq –hostname my-rabbitmq-host -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin –restart=always rabbitmq:management
參數(shù)說明:
- -d:后臺運行容器
- -name:指定容器名
- -p:指定服務(wù)運行的端口(5672:應(yīng)用訪問端口;15672:控制臺Web端口號)
- -v:映射目錄或文件,啟動了一個數(shù)據(jù)卷容器,數(shù)據(jù)卷路徑為:/var/lib/rabbitmq,再將此數(shù)據(jù)卷映射到住宿主機的/data目錄
- –hostname:主機名(RabbitMQ的一個重要注意事項是它根據(jù)所謂的 “節(jié)點名稱” 存儲數(shù)據(jù),默認為主機名)
- -e:指定環(huán)境變量;(RABBITMQ_DEFAULT_VHOST:默認虛擬機名;RABBITMQ_DEFAULT_USER:默認的用戶名;RABBITMQ_DEFAULT_PASS:默認用戶名的密碼)
- –restart=always:當(dāng)Docker重啟時,容器能自動啟動
- rabbitmq:management:鏡像名
注1:RABBITMQ_DEFAULT_VHOST=my_vhost,my_vhost名字請記好,在之后的編程中要用到, 如果啟動時沒指定,默認值為/
4.進入RabbitMQ管理平臺進行相關(guān)操作
- 注1:容器啟動后,可以通過docker logs 窗口ID/容器名字 查看日志 docker logs my-rabbitmq
- 注2:停止并刪除所有容器 docker stop $(docker ps -aq) && docker rm $(docker ps -aq)
常用操作命令
命令行與管控臺-基礎(chǔ)操作
- rabbitmqctl stop_app:關(guān)閉應(yīng)用
- rabbitmqctl start_app:啟動應(yīng)用
- rabbitmqctl status:節(jié)點狀態(tài)
- rabbitmqctl add_user username password:添加用戶
- rabbitmqctl list_users:列出所有用戶
- rabbitmqctl delete_user username:刪除用戶
- rabbitmqctl clear_permissions -p vhostpath username:清除用戶權(quán)限
- rabbitmqctl list_user_permissions username:列出用戶權(quán)限
- rabbitmqctl change_password username newpassword:修改密碼
- rabbitmqctl set_permissions -p vhostpath username “.” “.” “.*”
- rabbitmqctl add_vhost vhostpath:創(chuàng)建虛擬主機
- rabbitmqctl list_vhosts:列出所有虛擬主機
- rabbitmqctl list_permissions -p vhostpath:列出虛擬主機上所有權(quán)限
- rabbitmqctl delete_vhost vhostpath:刪除虛擬主機
- rabbitmqctl list_queues:查看所有隊列信息
- rabbitmqctl -p vhostpath purge_queue blue:清除隊列里的消息
命令行與管控臺-高級操作
- rabbitmqctl reset:移除所有數(shù)據(jù),要在rabbitmqctl stop_app之后使用
- rabbitmqctl join_cluster [–ram]:組成集群命令
- rabbitmqctl cluster_status:查看集群狀態(tài)
- rabbitmqctl change_cluster_node_type disc | ram:修改集群節(jié)點的存儲形式
- rabbitmqctl forget_cluster_node {–offline} 忘記節(jié)點 (摘除節(jié)點)
- rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2…] (修改節(jié)點名稱)
RabbitMQ快速入門
極速入門-消息生產(chǎn)與消費
- ConnectionFactory:獲取連接工廠
- Connection:一個鏈接
- Channel:數(shù)據(jù)通信通道,課發(fā)送和接收消息
- Queue:具體的消息存儲隊列
- Producer & Consumer:生產(chǎn)和消費者
創(chuàng)建一個springboot項目: rabbitmq-api
導(dǎo)入pom依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
消費端代碼
package com.xieminglu.rabbitmqapi.quickstart; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; public class Consumer { public static void main(String[] args) throws Exception { //1 創(chuàng)建一個ConnectionFactory, 并進行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 通過連接工廠創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); //3 通過connection創(chuàng)建一個Channel Channel channel = connection.createChannel(); //4 聲明(創(chuàng)建)一個隊列 String queueName = "test001"; // 參數(shù):隊列名稱、持久化與否、獨占與否、無消息隊列是否自動刪除、消息參數(shù) // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) channel.queueDeclare(queueName, true, false, false, null); //5 創(chuàng)建消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6 設(shè)置Channel // 參數(shù):隊列名稱、自動簽收、消費者回調(diào) // basicConsume(String queue, boolean autoAck, Consumer callback) channel.basicConsume(queueName, true, queueingConsumer); while(true){ //7 獲取消息(Delivery:傳送) QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消費端: " + msg); //Envelope envelope = delivery.getEnvelope(); } } }
生產(chǎn)端
package com.xieminglu.rabbitmqapi.quickstart; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Procuder { public static void main(String[] args) throws Exception { //1 創(chuàng)建一個ConnectionFactory, 并進行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 通過連接工廠創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); //3 通過connection創(chuàng)建一個Channel Channel channel = connection.createChannel(); //4 通過Channel發(fā)送數(shù)據(jù) for(int i=0; i < 5; i++){ String msg = "Hello RabbitMQ!"; //1 exchange 2 routingKey channel.basicPublish("", "test001", null, msg.getBytes()); } //5 記得要關(guān)閉相關(guān)的連接 channel.close(); connection.close(); } }
到此這篇關(guān)于Java消息隊列RabbitMQ入門詳解的文章就介紹到這了,更多相關(guān)RabbitMQ入門內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring?Boot?中的?@HystrixCommand?注解原理及使用方法
通過使用 @HystrixCommand 注解,我們可以輕松地實現(xiàn)對方法的隔離和監(jiān)控,從而提高系統(tǒng)的可靠性和穩(wěn)定性,本文介紹了Spring Boot 中的@HystrixCommand注解是什么,其原理以及如何使用,感興趣的朋友跟隨小編一起看看吧2023-07-07jsp、servlet前后端交互對數(shù)據(jù)處理及展示的簡單實現(xiàn)
Servlet和JSP是Java Web開發(fā)中的兩個重要概念,在Servlet和JSP中前后端交互可以通過一些方式來實現(xiàn),這篇文章主要給大家介紹了關(guān)于jsp、servlet前后端交互對數(shù)據(jù)處理及展示的簡單實現(xiàn),需要的朋友可以參考下2023-12-12Mybatis實戰(zhàn)之TypeHandler高級進階
本文主要介紹了自定義的枚舉TypeHandler的相關(guān)知識,具有很好的參考價值,下面跟著小編一起來看下吧2017-02-02解決mybatis-plus3.4.1分頁插件PaginationInterceptor和防止全表更新與刪除插件SqlE
這篇文章給大家介紹了在Spring.xml文件中配置mybatis-plus3.4.1分頁插件PaginationInterceptor和防止全表更新與刪除插件SqlExplainInterceptor過時失效問題解決方案,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2020-12-12springboot下實現(xiàn)RedisTemplate?List?清空
我們經(jīng)常會使用Redis的List數(shù)據(jù)結(jié)構(gòu)來存儲一系列的元素,當(dāng)我們需要清空一個List時,可以使用RedisTemplate來實現(xiàn),本文就來詳細的介紹一下如何實現(xiàn),感興趣的可以了解一下2024-01-01SpringBoot混合使用StringRedisTemplate和RedisTemplate的坑及解決
這篇文章主要介紹了SpringBoot混合使用StringRedisTemplate和RedisTemplate的坑及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12