RabbitMQ實(shí)現(xiàn)Work Queue工作隊(duì)列的示例詳解
RabbitMQ Work Queue工作隊(duì)列

工作隊(duì)列(又稱任務(wù)隊(duì)列)的主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成。
相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個(gè)工作線程時(shí),這些工作線程將一起處理這些任務(wù)。

多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,同一條消息只會(huì)被一個(gè)消費(fèi)者處理。
但是對(duì)于工作隊(duì)列,可以提高消息的處理速度,避免隊(duì)列中的消息堆積。
我們以一個(gè)例子來(lái)解釋work queue工作隊(duì)列。在生產(chǎn)者的服務(wù)中添加測(cè)試方法,通過(guò)循環(huán)的方式,向名為simple.queue隊(duì)列中發(fā)送50條消息,代碼和詳細(xì)描述如下:
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
String queueName="simple.queue";//隊(duì)列名稱
String message = "hello, message_";//發(fā)送的消息
for (int i=1;i<=50;i++){
rabbitTemplate.convertAndSend(queueName,message+i);
Thread.sleep(20);
}
}
}在消費(fèi)者的服務(wù)模塊中,定義兩個(gè)消息監(jiān)聽,分別為listenSimpleQueue1和listenSimpleQueue2,讓它們都監(jiān)聽simple.queue隊(duì)列,并且設(shè)置休眠時(shí)間,使得消費(fèi)者1每秒處理50條消息,消費(fèi)者2每秒處理10條消息。
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
@Component
public class SpringRabbitListener {
@RabbitListener(queues="simple.queue")
public void listenSimpleQueue1(String msg) throws InterruptedException {
System.out.println("消費(fèi)者1已經(jīng)接收到simple.queue的消息:[" + msg + "]"+ LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues="simple.queue")
public void listenSimpleQueue2(String msg) throws InterruptedException {
System.err.println("消費(fèi)者2已經(jīng)接收到simple.queue的消息:[" + msg + "]"+LocalTime.now());
Thread.sleep(200);
}
}
消費(fèi)者的application.yaml文件,設(shè)置消費(fèi)者每次只能獲取一條消息,生產(chǎn)者和消費(fèi)者的配置文件相似。
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 192.168.220.13*
port: 5672
username: user
password: ******
virtual-host: /
Listener:
simple:
prefetch: 1 #每次只能獲取一條消息,處理完成才能獲取下一條消息 控制消費(fèi)者預(yù)取消息的上限
處理完成后,運(yùn)行項(xiàng)目,可以得到消費(fèi)者1和消費(fèi)者2都能消費(fèi)消息,并且可以根據(jù)休眠時(shí)間有序進(jìn)行工作。

到此這篇關(guān)于RabbitMQ實(shí)現(xiàn)Work Queue工作隊(duì)列的示例詳解的文章就介紹到這了,更多相關(guān)RabbitMQ Work Queue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java jdk1.8 使用stream流進(jìn)行l(wèi)ist 分組歸類操作
這篇文章主要介紹了java jdk1.8 使用stream流進(jìn)行l(wèi)ist 分組歸類操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-10-10
SpringBoot JPA懶加載失效的解決方案(親測(cè)有效)
這篇文章主要介紹了SpringBoot JPA懶加載失效的解決方案(親測(cè)有效),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
解決常見的Eclipse SVN插件報(bào)錯(cuò)方法詳解
本篇文章是對(duì)常見的Eclipse SVN插件報(bào)錯(cuò)方法進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05
關(guān)于stream().sorted()以及java中常用的比較器排序
這篇文章主要介紹了關(guān)于stream().sorted()以及java中常用的比較器排序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
在SpringBoot項(xiàng)目中利用maven的generate插件
今天小編就為大家分享一篇關(guān)于在SpringBoot項(xiàng)目中利用maven的generate插件,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-01-01
Java實(shí)現(xiàn)線程的暫停和恢復(fù)的示例詳解
這幾天的項(xiàng)目中,客戶給了個(gè)需求,希望我可以開啟一個(gè)任務(wù),想什么時(shí)候暫停就什么時(shí)候暫停,想什么時(shí)候開始就什么時(shí)候開始,所以本文小編給大家介紹了Java實(shí)現(xiàn)線程的暫停和恢復(fù)的示例,需要的朋友可以參考下2023-11-11

