消息中間件ActiveMQ的簡單入門介紹與使用
一、什么是消息中間件
消息中間件顧名思義實現的就是在兩個系統(tǒng)或兩個客戶端之間進行消息傳送

二、什么是ActiveMQ
ActiveMQ是一種開源的基于JMS(Java Message Servie)規(guī)范的一種消息中間件的實現,ActiveMQ的設計目標是提供標準的,面向消息的,能夠跨越多語言和多系統(tǒng)的應用集成消息通信中間件。
三、什么時候需要用ActiveMQ
ActiveMQ常被應用與系統(tǒng)業(yè)務的解耦,異步消息的推送,增加系統(tǒng)并發(fā)量,提高用戶體驗。例如以我在工作中的使用,在比較耗時且異步的遠程開鎖操作時

四、如何使用ActiveMQ
1.AcitveMQ的數據傳送流程

2.ActiveMQ的兩種消息傳遞類型
(1)點對點傳輸,即一個生產者對應一個消費者,生產者向broke推送數據,數據存儲在broke的一個隊列中,當消費者接受該條隊列里的數據。
(2)基于發(fā)布/訂閱模式的傳輸,即根據訂閱話題來接收相應數據,一個生產者可向多個消費者推送數據,與MQTT協(xié)議的實現是類似的,對MQTT協(xié)議有興趣的可跳轉文末查看
兩種消息傳遞類型的不同,點對點傳輸消費者可以接收到在連接之前生產者所推送的數據,而基于發(fā)布/訂閱模式的傳輸方式消費者只能接收到連接之后生產者推送的數據。
3.ActiveMQ的安裝與啟動
(1)官網下載對應服務器版本

(2)解壓后進入apache-activemq-5.15.9/bin目錄
(3)執(zhí)行./activemq start啟動ActiveMQ

(4)瀏覽器輸入ActiveMQ啟動的服務器ip:8161便可進入web界面,點擊Manage ActiveMQ broker可以查看消息推送的狀態(tài),默認賬號密碼為admin,admin

(5)啟動錯誤分析
進入/root/apache-activemq-5.15.9/data目錄查看activemq.log文件,根據錯誤提示信息修改,例如端口號被占用等。
4.ActiveMQ的代碼測試
(1)構建maven項目,引入依賴
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
(2)生產者類
/**
* @Description 生產者
* @Date 2019/7/20
* @Created by yqh
*/
public class MyProducer {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
// 打開連接
connection.start();
// 創(chuàng)建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據
Destination destination = session.createQueue("myQueue");
// 創(chuàng)建一個生產者
MessageProducer producer = session.createProducer(destination);
// 向隊列推送10個文本消息數據
for (int i = 1 ; i <= 10 ; i++){
// 創(chuàng)建文本消息
TextMessage message = session.createTextMessage("第" + i + "個文本消息");
//發(fā)送消息
producer.send(message);
//在本地打印消息
System.out.println("已發(fā)送的消息:" + message.getText());
}
//關閉連接
connection.close();
}
}
運行結果:
已發(fā)送的消息:第1個文本消息
已發(fā)送的消息:第2個文本消息
已發(fā)送的消息:第3個文本消息
已發(fā)送的消息:第4個文本消息
已發(fā)送的消息:第5個文本消息
已發(fā)送的消息:第6個文本消息
已發(fā)送的消息:第7個文本消息
已發(fā)送的消息:第8個文本消息
已發(fā)送的消息:第9個文本消息
已發(fā)送的消息:第10個文本消息
測試查看web后臺顯示,有10條消息在隊列中等待消費

(3)消費者類
/**
* @Description 消費者類
* @Date 2019/7/20 0020
* @Created by yqh
*/
public class MyConsumer {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
// 打開連接
connection.start();
// 創(chuàng)建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據
Destination destination = session.createQueue("myQueue");
// 創(chuàng)建消費者
MessageConsumer consumer = session.createConsumer(destination);
// 創(chuàng)建消費的監(jiān)聽
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消費的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
測試結果:
消費的消息:第1個文本消息
消費的消息:第2個文本消息
消費的消息:第3個文本消息
消費的消息:第4個文本消息
消費的消息:第5個文本消息
消費的消息:第6個文本消息
消費的消息:第7個文本消息
消費的消息:第8個文本消息
消費的消息:第9個文本消息
消費的消息:第10個文本消息
web后臺顯示有一個消費者處于連接狀態(tài),且已消費了10個message,而該條隊列已沒有message待消費了

(4)當我們運行兩個消費者類,消息又是怎么被消費的呢?是兩個消費者都能收到生產者生產的message,還是只有其中一個消費者能消費呢?
我們先運行兩個消費者,在運行一個生產者對目標隊列生產10個message,會發(fā)現有以下情況
// Consumer1控制臺
消費的消息:第1個文本消息
消費的消息:第3個文本消息
消費的消息:第5個文本消息
消費的消息:第7個文本消息
消費的消息:第9個文本消息
// Consumer2控制臺
消費的消息:第2個文本消息
消費的消息:第4個文本消息
消費的消息:第6個文本消息
消費的消息:第8個文本消息
消費的消息:第10個文本消息
即隊列中的數據會平均的分給每一個消費者消費,且每一條數據只能被消費一次
(5)以上是基于隊列點對點的傳輸類型,以下是基于發(fā)布/訂閱模式傳輸的類型測試
/**
* @Description 基于發(fā)布/訂閱模式傳輸類型的生產者測試
* @Date 2019/7/20 0020
* @Created by yqh
*/
public class MyProducerForTopic {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
// 打開連接
connection.start();
// 創(chuàng)建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據
Destination destination = session.createTopic("topicTest");
// 創(chuàng)建一個生產者
MessageProducer producer = session.createProducer(destination);
// 向隊列推送10個文本消息數據
for (int i = 1 ; i <= 10 ; i++){
// 創(chuàng)建文本消息
TextMessage message = session.createTextMessage("第" + i + "個文本消息");
//發(fā)送消息
producer.send(message);
//在本地打印消息
System.out.println("已發(fā)送的消息:" + message.getText());
}
//關閉連接
connection.close();
}
}
/**
* @Description 基于發(fā)布/訂閱模式傳輸類型的消費者測試
* @Date 2019/7/20 0020
* @Created by yqh
*/
public class MyConsumerForTopic {
private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";
public static void main(String[] args) throws JMSException {
// 創(chuàng)建連接工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 創(chuàng)建連接
Connection connection = activeMQConnectionFactory.createConnection();
// 打開連接
connection.start();
// 創(chuàng)建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據
Destination destination = session.createTopic("topicTest");
// 創(chuàng)建消費者
MessageConsumer consumer = session.createConsumer(destination);
// 創(chuàng)建消費的監(jiān)聽
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消費的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
現在如果我們先啟動生產者,再啟動消費者,會發(fā)現消費者是無法接收到之前生產者之前所生產的數據,只有消費者先啟動,再讓生產者消費才可以正常接收數據,這也是發(fā)布/訂閱的主題模式與點對點的隊列模式的一個明顯區(qū)別。
而如果啟動兩個消費者,那么每一個消費者都能完整的接收到生產者生產的數據,即每一條數據都被消費了兩次,這是發(fā)布/訂閱的主題模式與點對點的隊列模式的另一個明顯區(qū)別。
淺談MQTT
1、什么是MQTT
MQTT的全稱是“ Message Queuing Telemetry Transport”,即消息隊列遙測傳輸,是一種基于訂閱/發(fā)布模式的應用層協(xié)議,而http是一種基于restful風格的一種應用層協(xié)議。
MQTT協(xié)議是一種輕量級協(xié)議,作為一種低開銷、低帶寬占用的即時通訊協(xié)議,常被應用于物聯(lián)網項目。同樣基于訂閱/發(fā)布模式的中間件有ActiveMQ,Kafka等消息中間件,歸根結底實現的都是消息的傳輸。
2、如何理解MQTT
MQTT的是一種應用層協(xié)議,每一種協(xié)議都有其適用場景,而MQTT常被應用于消息推送,消息采集。例如溫度檢測儀器定時上傳溫度、檢測礦洞氧氣濃度等。
MQTT是基于TCP/IP的一種應用層協(xié)議,TCP/IP本身已實現了在不可靠的網絡環(huán)境提供可靠的網絡傳輸的功能,而MQTT協(xié)議也有其保障消息可靠傳輸的策略。
MQTT推送的消息有三種消息質量
1.至多一次,即消息只推送一次,至于消息有沒有推送成功
2.至少一次,需要確認消息到達,可能會導致收到重復數據(注:MQTT定義的重發(fā)機制與tcp的重復機制是不同的,tcp的重復機制是在限定時間內如果沒有收到對應序號的響應報文,則會重新推送該序列號對應的報文,而MQTT的重發(fā)機制是在客戶端重新建立連接時,
補發(fā)之前沒有對應響應報文的數據包,當然客戶端可以選擇是否要接收這些之前沒有傳輸成功的數據包。最開始使用netty實現MQTT服務器的時候就理解錯了,以為MQTT的重復機制與tcp的重復機制一樣)
3.只有一次,確認消息只到達一次,常用于對數據要求嚴格的場景,例如計費場景,訂單場景
3、如何使用MQTT
MQTT的客戶端和服務端目前已有成熟的開源產品,例如服務端有emqx,客戶端有Eclipse Paho Mqtt(Java),都可以方面的引入相應的庫快速的實現推送功能(具體可根據需求查看對應的API)。
本質上來將是客戶端與服務端建立一個Socket,然后根據MQTT協(xié)議規(guī)定發(fā)送響應的報,例如建立socket后發(fā)送connet報文去建立連接,然后服務器會解析該連接報文,并保存該連接的相關信息。
我們可以把MQTT協(xié)議的規(guī)定當成是我們實現web項目中所實現的業(yè)務邏輯。
4、MQTT協(xié)議的相關的名詞解析
1.訂閱(Subscription)
訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS)。訂閱會與一個會話(Session)關聯(lián)。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。
2.、會話(Session)
每個客戶端與服務器建立連接后就是一個會話,客戶端和服務器之間有狀態(tài)交互。會話存在于一個網絡之間,也可能在客戶端和服務器之間跨越多個連續(xù)的網絡連接。
3.主題名(Topic Name)
連接到一個應用程序消息的標簽,該標簽與服務器的訂閱相匹配。服務器會將消息發(fā)送給訂閱所匹配標簽的每個客戶端。
4.主題篩選器(Topic Filter)
一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所匹配到的多個主題。
5.負載(Payload)
消息訂閱者所具體接收的內容。
總結
到此這篇關于ActiveMQ的簡單入門介紹與使用的文章就介紹到這了,更多相關ActiveMQ介紹與使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java高級之HashMap中的entrySet()方法使用
這篇文章主要介紹了Java高級之HashMap中的entrySet()方法使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03
Spring類型轉換 ConversionSerivce Convertor解析
這篇文章主要介紹了Spring類型轉換 ConversionSerivce Convertor的相關資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2016-11-11
Linux環(huán)境下的Java(JDBC)連接openGauss數據庫實踐記錄
這篇文章主要介紹了Linux環(huán)境下的Java(JDBC)連接openGauss數據庫實踐記錄,需要的朋友可以參考下2022-11-11
解析SpringBoot整合SpringDataRedis的過程
這篇文章主要介紹了SpringBoot整合SpringDataRedis的過程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-06-06
并發(fā)編程之Java內存模型volatile的內存語義
這篇文章主要介紹了并發(fā)編程之Java內存模型volatile的內存語義,理解volatile特性的一個好辦法是把對volatile變量的單個讀/寫,看成是使用同一個鎖對單個讀/寫操作做了同步。下面我們一起進入文章看看具體例子吧,需要的小伙伴可以參考下2021-11-11
SpringBoot使用JavaMailSender實現發(fā)送郵件+Excel附件
項目審批完畢后,需要發(fā)送郵件通知相關人員,并且要附帶數據庫表生成的Excel表格,這就要求不光是郵件發(fā)送功能,還要臨時生成Excel表格做為附件,本文詳細介紹了SpringBoot如何使用JavaMailSender實現發(fā)送郵件+Excel附件,需要的朋友可以參考下2023-10-10

