Java中間消息件ActiveMQ使用實例
先來說一說我們?yōu)槭裁匆眠@個東西??!
比如,我們現(xiàn)在有這樣了個問題要解決:
這樣,我們就要用到中間消息間了
然后我們就說一下什么是中間消息間吧。
采用消息傳送機制/消息隊列 的中間件技術(shù),進行數(shù)據(jù)交流,用在分布式系統(tǒng)的集成。
Java中對Jms有了定義,這是Java消息的統(tǒng)一接口。什么是ActiveMq呢?這是這個接口的一種實現(xiàn),相當于數(shù)據(jù)庫連接驅(qū)動一樣,不同廠商有自己不同的實現(xiàn),我們盡快看怎么用代碼實現(xiàn)吧。
消息一共有兩種接收和發(fā)送形式:點對點和發(fā)布定閱模式,也就是“一對一”和“一對多”。
1.導(dǎo)包(maven):
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
2.開始寫類,提供者(發(fā)送者)和消費者(接收者)是兩個不同的項目,我們先創(chuàng)建普通的maven項目,而不是web項目點對點的方式(消息只能被消費一次,如果同時有多個消費者,誰先搶到就是誰的)
消息提供者
public static void main(String[] args) throws JMSException { //創(chuàng)建連接工廠,這個參數(shù)就是自己的activeMQ的地址 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.180:61616"); //2.創(chuàng)建連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session(會話對象) /* arg0 是否啟用事務(wù) arg1 消息的確認方式 自動確認 */ Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //5.創(chuàng)建一個隊列對象,名稱 Queue firstQueue = session.createQueue("firstQueue"); //6.創(chuàng)建一個消息的生產(chǎn)者對象 // Destination destination = ;//目標對象 MessageProducer producer = session.createProducer(firstQueue); //7.創(chuàng)建一個消息 TextMessage textMessage = session.createTextMessage("歡迎來到奇的天喻軟件"); //8.發(fā)送消息 producer.send(textMessage); //9.關(guān)閉資源 producer.close(); session.close(); connection.close(); }
消息消費者
前幾步是一樣的,都是創(chuàng)建連接,只有第6步不一樣,創(chuàng)建的是一個消費者
public static void main(String[] args) throws JMSException, IOException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.180:61616"); //2.創(chuàng)建連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session(會話對象) /* arg0 是否啟用事務(wù) arg1 消息的確認方式 自動確認 */ Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //5.創(chuàng)建一個隊列對象,名稱 Queue firstQueue = session.createQueue("firstQueue"); //6.創(chuàng)建消息消費者對象 MessageConsumer consumer = session.createConsumer(firstQueue); //7.設(shè)置監(jiān)聽 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("提取的消息是"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.等待鍵盤輸入 //目的是為了讓程序停止來看效果 System.in.read(); //9.關(guān)閉資源 consumer.close(); session.close(); connection.close(); }
發(fā)布訂閱模式(發(fā)布消息后,只有在之前運行的消費者才能收到,消息被任何一個消費者消費后,以后啟動的消費者不能消費之前的消息)
消息提供者
//創(chuàng)建連接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.180:61616"); //2.創(chuàng)建連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session(會話對象) /* arg0 是否啟用事務(wù) arg1 消息的確認方式 自動確認 */ Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //5 Topic topic = session.createTopic("first-topic"); //6.創(chuàng)建一個消息的生產(chǎn)者對象 // Destination destination = ;//目標對象 MessageProducer producer = session.createProducer(topic); //7.創(chuàng)建一個消息 TextMessage textMessage = session.createTextMessage("歡迎來到奇的天喻軟件"); //8.發(fā)送消息 producer.send(textMessage); //9.關(guān)閉資源 producer.close(); session.close(); connection.close();
消費者
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.180:61616"); //2.創(chuàng)建連接 Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.獲取session(會話對象) /* arg0 是否啟用事務(wù) arg1 消息的確認方式 自動確認 */ Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); //5 Topic topic = session.createTopic("first-topic"); //6.創(chuàng)建消息消費者對象 MessageConsumer consumer = session.createConsumer(topic); //7.設(shè)置監(jiān)聽 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("提取的消息是"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.等待鍵盤輸入 //目的是為了讓程序停止來看效果 System.in.read(); //9.關(guān)閉資源 consumer.close(); session.close(); connection.close();
總結(jié),是不是發(fā)現(xiàn)上邊代碼都很相似,那么完全可以用Spring來管理了啊
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
ByteArrayOutputStream簡介和使用_動力節(jié)點Java學(xué)院整理
ByteArrayOutputStream 是字節(jié)數(shù)組輸出流。它繼承于OutputStream。這篇文章主要介紹了ByteArrayOutputStream簡介和使用,需要的朋友可以參考下2017-05-05Java 定時器(Timer,TimerTask)詳解及實例代碼
這篇文章主要介紹了 Java 定時器(Timer,TimerTask)詳解及實例代碼的相關(guān)資料,需要的朋友可以參考下2017-01-01Java?Kryo,Protostuff,Hessian序列化方式對比
這篇文章主要介紹了Java?Kryo,Protostuff,Hessian序列化方式對比,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-07-07Java中BigDecimal的加減乘除、比較大小與使用注意事項
對于不需要任何準確計算精度的數(shù)字可以直接使用float或double,但是如果需要精確計算的結(jié)果,則必須使用BigDecimal類,而且使用BigDecimal類也可以進行大數(shù)的操作,下面這篇文章給大家介紹了Java中BigDecimal的加減乘除、比較大小與使用注意事項,需要的朋友可以參考下。2017-11-11