Springboot整合Active消息隊列
簡單理解:
Active是Apache公司旗下的一個消息總線,ActiveMQ是一個開源兼容Java Message Service(JMS) 面向消息的中件間. 是一個提供松耦合的應用程序架構(gòu).
主要用來在服務與服務之間進行異步通信的。
一、搭建步驟
1、相應jar包
<!-- 整合消息隊列ActiveMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 如果配置線程池則加入 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
2、application.properties文件
#整合jms測試,安裝在別的機器,防火墻和端口號記得開放 spring.activemq.broker-url=tcp://47.96.44.110:61616 spring.activemq.user=admin spring.activemq.password=admin #下列配置要增加依賴 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100 #集群配置(后續(xù)需要在配上) #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) #消息隊列默認是點對點的,如果需要發(fā)布/訂閱模式那么需要加上下面注解(如果同時需要點對點發(fā)布訂閱這里也需注釋掉) # spring.jms.pub-sub-domain=true
3、Springboot主類
<!-- 主類需要多加一個@EnableJms注解,不過貌似我沒有加的時候,也能運行,為安全起見姑且加上 --> @SpringBootApplication @EnableJms
4.5.......根據(jù)不同消息模式來寫了。
二、點對點案例
我在這里案例中創(chuàng)建了兩個點對點隊列,所以他會有兩個queue對象,同樣對應每個queue對象,都會有單一對應的消費者。
1、Springboot主類
@SpringBootApplication
@EnableJms
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
//新建一個的Queue對象,交給sringboot管理,這個queue的名稱叫"first.queue".
@Bean
public Queue queue(){
return new ActiveMQQueue("first.queue");
}
}
2.1、first.queue對應消費者
@Component
public class FirstConsumer {
//名為"first.queue"消息隊列的消費者,通過JmsListener進行監(jiān)聽有沒有消息,有消息會立刻讀取過來
@JmsListener(destination="first.queue")
public void receiveQueue(String text){
System.out.println("FirstConsumer收到的報文為:"+text);
}
}
2.2、two.queue對應消費者(后面會創(chuàng)建)
@Component
public class TwoConsumer {
//名為"two.queue"消息隊列的消費者
@JmsListener(destination="two.queue")
public void receiveQueue(String text){
System.out.println("TwoConsumer收到的報文為:"+text);
}
}
3、Service類
/**
* 功能描述:消息生產(chǎn)
*/
public interface ProducerService {
// 功能描述:指定消息隊列,還有消息
public void sendMessage(Destination destination, final String message);
// 功能描述:使用默認消息隊列, 發(fā)送消息
public void sendMessage( final String message);
}
4、ServiceImpl實現(xiàn)類
/**
* 功能描述:消息生產(chǎn)者實現(xiàn)類
*/
@Service
public class ProducerServiceImpl implements ProducerService{
//這個隊列就是Springboot主類中bean的對象
@Autowired
private Queue queue;
//用來發(fā)送消息到broker的對象,可以理解連接數(shù)據(jù)庫的JDBC
@Autowired
private JmsMessagingTemplate jmsTemplate;
//發(fā)送消息,destination是發(fā)送到的隊列,message是待發(fā)送的消息
@Override
public void sendMessage(Destination destination, String message) {
jmsTemplate.convertAndSend(destination, message);
}
//發(fā)送消息,queue是發(fā)送到的隊列,message是待發(fā)送的消息
@Override
public void sendMessage(final String message) {
jmsTemplate.convertAndSend(this.queue, message);
}
}
5.QueueController類
/**
* 功能描述:點對點消息隊列控制層
*/
@RestController
@RequestMapping("/api/v1")
public class QueueController {
@Autowired
private ProducerService producerService;
// 這里后面調(diào)用的是Springboot主類的quene隊列
@GetMapping("first")
public Object common(String msg){
producerService.sendMessage(msg);
return "Success";
}
// 這個隊列是新建的一個名為two.queue的點對點消息隊列
@GetMapping("two")
public Object order(String msg){
Destination destination = new ActiveMQQueue("two.queue");
producerService.sendMessage(destination, msg);
return "Success";
}
}
6、案例演示:

從演示效果可以得出以下結(jié)論:
1:當springboot啟動時候,就生成了這兩個隊列,而且他們都會有一個消費者
2:當我通過頁面訪問的時候,就相當于生產(chǎn)者把消息放到隊列中,一旦放進去就會被消費者監(jiān)聽到,就可以獲取生產(chǎn)者放進去的值并在后臺打印出
順便對頁面中四個單詞進行解釋:
Number Of Pending Messages :待處理消息的數(shù)量。我們每次都會被監(jiān)聽處理掉,所以不存在待處理,如果存在就說這里面哪里出故障了,需要排查
Number Of Consumers : 消費者數(shù)量
Messages Enqueued: 消息排列,這個只增不見,代表已經(jīng)處理多少消息
Messages Dequeued: 消息出隊。
三、發(fā)布/訂閱者模式
在上面點對點代碼的基礎上,添加發(fā)布/訂閱相關代碼
1.appliaction.properties文件
#消息隊列默認是點對點的,如果需要發(fā)布/訂閱模式那么需要加上下面注解(如果同時需要點對點發(fā)布訂閱這里也需注釋掉) spring.jms.pub-sub-domain=true
2.Springboot主類添加
//新建一個topic隊列
@Bean
public Topic topic(){
return new ActiveMQTopic("video.topic");
}
3.添加多個消費者類
//這里定義了三個消費者
@Component
public class TopicSub {
@JmsListener(destination="video.topic")
public void receive1(String text){
System.out.println("video.topic 消費者:receive1="+text);
}
@JmsListener(destination="video.topic")
public void receive2(String text){
System.out.println("video.topic 消費者:receive2="+text);
}
@JmsListener(destination="video.topic")
public void receive3(String text){
System.out.println("video.topic 消費者:receive3="+text);
}
}
4.Service類
//功能描述:消息發(fā)布者 public void publish(String msg);
5.ServiceImpl實現(xiàn)類
//=======發(fā)布訂閱相關代碼=========
@Autowired
private Topic topic;
@Override
public void publish(String msg) {
this.jmsTemplate.convertAndSend(this.topic, msg);
}
6.Controller類
// 這個隊列是新建的一個名為two.queue的點對點消息隊列
@GetMapping("topic")
public Object topic(String msg){
producerService.publish(msg);
return "Success";
}
7.演示效果:

從演示效果總結(jié)如下:
1:Springboot啟動的時候,在Topics目錄下,一共出現(xiàn)了5個消費者。first.queue一個消費者、two.queue一個消費者、video.topic三個消費者
2:當我在控制臺輸入信息后,video.topic的三個消費者都會監(jiān)聽video.topic發(fā)布的消息,并在控制臺打印。
四、如何讓點對點和發(fā)布訂閱同時有效
為什么這么說呢,因為當我向上面一樣同時開啟,會發(fā)現(xiàn)點對點模式已經(jīng)失效了。
效果演示

從演示效果,可以得出如下結(jié)論:
1:我們發(fā)現(xiàn)我們在頁面輸入..../two?msg=555消息后,后臺并沒有成功打印消息。再看Active界面發(fā)現(xiàn),這個queue對象,確實有一條待處理的消息,但是我們發(fā)現(xiàn),它對應的消費者數(shù)量是為0.
2:然而我們在打開topic頁面發(fā)現(xiàn),這里卻存在一個消費者。
所以我個人理解是,當同時啟動的時候,所產(chǎn)生的消費者默認都是Topic消費者,沒有Queue消費者,所以它監(jiān)聽不到queue所待處理的消息。
當配置文件不加:spring.jms.pub-sub-domain=true 那么系統(tǒng)會默認支持quene(點對點模式),但一旦加上這段配置,系統(tǒng)又變成只支持發(fā)布訂閱模式。
那如何同時都可以成功呢?
思路如下:
第一步:還是需要去掉配置文件中的:
#消息隊列默認是點對點的,如果需要發(fā)布/訂閱模式那么需要加上下面注解(如果同時需要點對點發(fā)布訂閱這里也需注釋掉) #spring.jms.pub-sub-domain=true
第二步:在發(fā)布訂閱者的中消費者中指定獨立的containerFactory
因為你去掉上面的配置,那么系統(tǒng)就默認是queue,所以@JmsListener如果不指定獨立的containerFactory的話是只能消費queue消息
@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
public void receive1(String text){
System.out.println("video.topic 消費者:receive1="+text);
}
@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
public void receive2(String text){
System.out.println("video.topic 消費者:receive2="+text);
}
//第三步我不添加containerFactory="jmsListenerContainerTopic"看等下是否會打印出
@JmsListener(destination="video.topic")
public void receive3(String text){
System.out.println("video.topic 消費者:receive3="+text);
}
第三步:定義獨立的topic定義獨立的JmsListenerContainer
在springboot主類中添加:
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
效果:

得出結(jié)論:
1:點對點,和發(fā)布訂閱都有用
2:receive3沒有指定獨立的containerFactory一樣沒有打印出來。
源碼
github地址:https://github.com/yudiandemingzi/springbootAcitveMQ
相關文章
java:try...catch跳過異常繼續(xù)處理循環(huán)問題
這篇文章主要介紹了java:try...catch跳過異常繼續(xù)處理循環(huán)問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10
java操作mongodb時,對象bean和DBObject相互轉(zhuǎn)換的方法(推薦)
下面小編就為大家?guī)硪黄猨ava操作mongodb時,對象bean和DBObject相互轉(zhuǎn)換的方法(推薦)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-11-11
java底層AQS實現(xiàn)類ReentrantLock鎖的構(gòu)成及源碼解析
本章我們就要來學習一下第一個?AQS?的實現(xiàn)類:ReentrantLock,看看其底層是如何組合?AQS?,實現(xiàn)了自己的那些功能,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步2022-03-03
javacv開發(fā)詳解之調(diào)用本機攝像頭視頻
這篇文章主要介紹了javacv開發(fā)詳解之調(diào)用本機攝像頭視頻,對javacv感興趣的同學,可以參考下2021-04-04

