RabbitMQ消費(fèi)者限流實(shí)現(xiàn)消息處理優(yōu)化
目錄結(jié)構(gòu)
導(dǎo)入依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.5.0</version> </dependency> </dependencies>
修改yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 手動確認(rèn)模式
prefetch: 1 # 每次消費(fèi)僅1條消息
業(yè)務(wù)邏輯
為了驗(yàn)證是否一定要手動確認(rèn)才能真正消費(fèi)消息,如下我進(jìn)行了測試:首先我先讓生產(chǎn)者生產(chǎn)兩條消息在隊(duì)列當(dāng)中,如下圖1所示。其次再看代碼邏輯。當(dāng)啟動了消費(fèi)者代碼后入下圖2所示:雖然確確實(shí)實(shí)被限流了,有一條未確認(rèn)的消息,但當(dāng)我們關(guān)閉消費(fèi)者端的應(yīng)用后,就又會變成圖1所示。
圖1
/** * 消費(fèi)者的限流機(jī)制 * 1、確保Ack機(jī)制為手動機(jī)制:acknowledge-mode: manual * 2、每次消費(fèi)消息的個(gè)數(shù):prefetch: 1 只有手動確認(rèn)完后才會拉取下一條消息 */ @Component public class QosListener implements ChannelAwareMessageListener { @RabbitListener(queues = "test_queue_name") @Override public void onMessage(Message message, Channel channel) throws Exception { System.out.println("消費(fèi)者接受的消息為:" + new String(message.getBody())); } }
圖2
所以可以當(dāng)我們再次改變業(yè)務(wù)邏輯:進(jìn)行手動確認(rèn)后就可以發(fā)現(xiàn)消息確確實(shí)實(shí)被消費(fèi)了,如圖3所示。要注意哈:第二個(gè)是否批量簽收參數(shù)表示的是開啟消費(fèi)者后是否只會讀取一次消息,而消費(fèi)者限流prefetch表示的是每次讀取只能為一條消息。兩者的概念是不一樣的。
/** * 消費(fèi)者的限流機(jī)制 * 1、確保Ack機(jī)制為手動機(jī)制:acknowledge-mode: manual * 2、每次消費(fèi)消息的個(gè)數(shù):prefetch: 1 只有手動確認(rèn)完后才會拉取下一條消息 */ @Component public class QosListener implements ChannelAwareMessageListener { @RabbitListener(queues = "test_queue_name") @Override public void onMessage(Message message, Channel channel) throws Exception { Thread.sleep(5000); long deliveryTag = message.getMessageProperties().getDeliveryTag();// 消息的唯一標(biāo)識id System.out.println("消費(fèi)者接受的消息為:" + new String(message.getBody())); channel.basicAck(deliveryTag,true);//每5s讀一次消息(限流后每次為一條) } }
到此這篇關(guān)于RabbitMQ消費(fèi)者限流實(shí)現(xiàn)消息處理優(yōu)化的文章就介紹到這了,更多相關(guān)RabbitMQ消費(fèi)者限流內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Synchronized?和?ReentrantLock?的實(shí)現(xiàn)原理及區(qū)別
這篇文章主要介紹了Synchronized?和?ReentrantLock?的實(shí)現(xiàn)原理及區(qū)別,文章為榮啊主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-09-09SpringBoot使用mybatis-plus分頁查詢無效的問題解決
MyBatis-Plus提供了很多便捷的功能,包括分頁查詢,本文主要介紹了SpringBoot使用mybatis-plus分頁查詢無效的問題解決,具有一定的參考價(jià)值,感興趣的可以了解一下2023-12-12springboot如何接收復(fù)雜參數(shù)(同時(shí)接收J(rèn)SON與文件)
文章介紹了在Spring Boot中同時(shí)處理JSON和文件上傳時(shí)使用`@RequestPart`注解的方法,`@RequestPart`可以接收多種格式的參數(shù),包括JSON和文件,并且可以作為`multipart/form-data`格式中的key2025-02-02SpringBoot同時(shí)支持HTTPS與HTTP的實(shí)現(xiàn)示例
本文主要介紹了SpringBoot同時(shí)支持HTTPS與HTTP的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Java中的關(guān)鍵字_動力節(jié)點(diǎn)Java學(xué)院整理
關(guān)鍵字也稱為保留字,是指Java語言中規(guī)定了特定含義的標(biāo)示符。對于保留字,用戶只能按照系統(tǒng)規(guī)定的方式使用,不能自行定義2017-04-04IDEA之web項(xiàng)目導(dǎo)入jar包方式
這篇文章主要介紹了IDEA之web項(xiàng)目導(dǎo)入jar包方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05