Spring3?中?RabbitMQ?的使用與常見場(chǎng)景分析
一、初識(shí) MQ
傳統(tǒng)的單體架構(gòu),分布式架構(gòu)的同步調(diào)用里,無論是方法調(diào)用,還是 OpenFeign 難免會(huì)有以下問題:
- 擴(kuò)展性差(高耦合,需要依賴對(duì)應(yīng)的服務(wù),同樣的事件,不斷有新需求,這個(gè)事件的業(yè)務(wù)代碼會(huì)越來越臃腫,這些子業(yè)務(wù)都寫在一起)
- 性能下降(等待響應(yīng),最終整個(gè)業(yè)務(wù)的響應(yīng)時(shí)長(zhǎng)就是每次遠(yuǎn)程調(diào)用的執(zhí)行時(shí)長(zhǎng)之和)
- 級(jí)聯(lián)失?。ㄈ绻且粋€(gè)事務(wù),一個(gè)服務(wù)失敗就會(huì)導(dǎo)致全部回滾,若是分布式事務(wù)就更加麻煩了,但其實(shí)一些行為的失敗不應(yīng)該導(dǎo)致整體回滾)
- 服務(wù)宕機(jī)(如果服務(wù)調(diào)用者未考慮服務(wù)提供者的性能,導(dǎo)致提供者因?yàn)檫^度請(qǐng)求而宕機(jī))
但如果不是很要求同步調(diào)用,其實(shí)也可以用異步調(diào)用,如果是單體架構(gòu),你可能很快能想到一個(gè)解決方案,就是阻塞隊(duì)列實(shí)現(xiàn)消息通知:
但是在分布式架構(gòu)下,可能就需要一個(gè)中間件級(jí)別的阻塞隊(duì)列,這就是我們要學(xué)習(xí)的 Message Queue 消息隊(duì)列,簡(jiǎn)稱 MQ,而現(xiàn)在流行的 MQ 還不少,在實(shí)現(xiàn)其基本的消息通知功能外,還有一些不錯(cuò)的擴(kuò)展
以 RabbitMQ 和 Kafka 為例:
RabbitMQ | Kafka | |
---|---|---|
公司/社區(qū) | Rabbit | Apache |
開發(fā)語言 | Erlang | Scala & Java |
協(xié)議支持 | AMQP,XMPP,SMTP,STOMP | 自定義協(xié)議 |
可用性 | 高 | 高 |
單機(jī)吞吐量 | 一般 | 非常高(Kafka 亮點(diǎn)) |
消息延遲 | 微秒級(jí) | 毫秒以內(nèi) |
消息可靠性 | 高 | 一般 |
消息延遲指的是,消息到隊(duì)列,并在隊(duì)列中“就緒”的時(shí)間與預(yù)期時(shí)間的差距,其實(shí)就是數(shù)據(jù)在中間件中流動(dòng)的耗時(shí),預(yù)期時(shí)間可以是現(xiàn)在、幾毫秒后、幾秒后、幾天后…
據(jù)統(tǒng)計(jì),目前國(guó)內(nèi)消息隊(duì)列使用最多的還是 RabbitMQ,再加上其各方面都比較均衡,穩(wěn)定性也好,因此我們課堂上選擇 RabbitMQ 來學(xué)習(xí)。
二、RabbitMQ 安裝
Docker 安裝 RabbitMQ:
mkdir /root/mq cd /root/mq docker rm mq-server -f docker rmi rabbitmq:3.8-management -f docker volume rm mq-plugins -f docker pull rabbitmq:3.8-management # 插件數(shù)據(jù)卷最好還是直接掛載 volume,而不是掛載我們的目錄 docker run \ --name mq-server \ -e RABBITMQ_DEFAULT_USER=xxx \ -e RABBITMQ_DEFAULT_PASS=xxx \ --hostname mq1 \ -v mq-plugins:/plugins \ -p 15672:15672 \ -p 5672:5672 \ -d rabbitmq:3.8-management
三、RabbitMQ 基本知識(shí)
(1)架構(gòu)
15672:RabbitMQ 提供的管理控制臺(tái)的端口
5672:RabbitMQ 的消息發(fā)送處理接口
用戶名密碼就是安裝時(shí),啟動(dòng)容器時(shí)指定的用戶名密碼
MQ 對(duì)應(yīng)的就是這里的消息代理 Broker:
RabbitMQ 詳細(xì)架構(gòu)圖:
其中包含幾個(gè)概念:
publisher
:生產(chǎn)者,也就是發(fā)送消息的一方consumer
:消費(fèi)者,也就是消費(fèi)消息的一方queue
:隊(duì)列,存儲(chǔ)消息。生產(chǎn)者投遞的消息會(huì)暫存在消息隊(duì)列中,等待消費(fèi)者處理exchange
:交換機(jī),負(fù)責(zé)消息路由。生產(chǎn)者發(fā)送的消息由交換機(jī)決定投遞到哪個(gè)隊(duì)列。virtual host
:虛擬主機(jī),起到數(shù)據(jù)隔離的作用。每個(gè)虛擬主機(jī)相互獨(dú)立,有各自的 exchange、queue
現(xiàn)在你可能只認(rèn)識(shí)生產(chǎn)者、消費(fèi)者、隊(duì)列,其他是什么呢?
其實(shí)你可以理解為 MQ 也是存儲(chǔ)東西的,存儲(chǔ)的就是消息,virtual host 就是數(shù)據(jù)庫,queue 就是表,消息就是一行數(shù)據(jù),而 MQ 有特殊的機(jī)制,消息先通過 exchange 再?zèng)Q定前往哪個(gè) queue
管理控制臺(tái)的使用就不多說了
(2)五大模式
這只是最常見的五種模式:
簡(jiǎn)單模式
工作模式
發(fā)布訂閱模式
關(guān)聯(lián)交換機(jī)的隊(duì)列都能收到一份消息,廣播
路由模式
關(guān)聯(lián)交換機(jī)時(shí),提供 routing key(可以是多個(gè),隊(duì)列之間可以重復(fù)),發(fā)布消息時(shí)提供一個(gè) routing key,由此發(fā)送給指定的隊(duì)列
值得注意的是,簡(jiǎn)單模式和工作模式,其實(shí)也是有交換機(jī)的,任何隊(duì)列都會(huì)綁定一個(gè)默認(rèn)交換機(jī)
""
,類型是 direct,routing key 為隊(duì)列的名稱
主題模式
路由模式的基礎(chǔ)上,隊(duì)列關(guān)聯(lián)交換機(jī)時(shí) routing key 可以是帶通配符的
routing key 的單詞通過
.
分割,#
匹配 n 個(gè)單詞(n ≥ 0),*
只匹配一個(gè)單詞例如 #.red:
可以匹配的 routing key:p1.red、red、p2.p1.red
在發(fā)布消息時(shí),要使用具體的 routing key,交換機(jī)發(fā)送給匹配的隊(duì)列
(3)數(shù)據(jù)隔離 隔離 virtual host
隔離用戶(賦予訪問權(quán)限)
四、RabbitMQ 基本使用 Spring AMQP
引入 RabbitMQ 相關(guān)的 SDK,可以通過創(chuàng)建連接 Connection、創(chuàng)建通道 Channel,用 Channel 進(jìn)行操作,接受消息也差不多,不過多演示:
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立連接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.設(shè)置連接參數(shù),分別是:主機(jī)名、端口號(hào)、vhost、用戶名、密碼 factory.setHost("xx.xx.xx.xx"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("xxx"); factory.setPassword("xxx"); // 1.2.建立連接 Connection connection = factory.newConnection(); // 2.創(chuàng)建通道Channel Channel channel = connection.createChannel(); // 3.創(chuàng)建隊(duì)列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.發(fā)送消息 String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("發(fā)送消息成功:【" + message + "】"); // 5.關(guān)閉通道和連接 channel.close(); connection.close(); } }
但比較麻煩,Spring AMQP 框架可以自動(dòng)裝配 RabbitMQ 的操作對(duì)象 RabbitTemplate,這樣我們就可以更方便的操作 MQ,并充分發(fā)揮其特性
<!--AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
默認(rèn)包含 RabbitMQ 的實(shí)現(xiàn),如果你想對(duì)接其他 AMQP 協(xié)議的 MQ,得自己實(shí)現(xiàn)其抽象封裝的接口
(1)發(fā)送消息
注意,下面是 Spring3 的寫法,所以會(huì)有點(diǎn)不一樣,可能看不懂,稍后解釋!
消息發(fā)送器封裝:
@Repository @RequiredArgsConstructor @Slf4j public class RabbitMQSender { private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread"); private final RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setTaskExecutor(EXECUTOR); } private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> { log.error("處理 ack 回執(zhí)失敗, {}", ex.getMessage()); return null; }; private MessagePostProcessor delayMessagePostProcessor(long delay) { return message -> { // 小于 0 也是立即執(zhí)行 // setDelay 才是給 RabbitMQ 看的,setReceivedDelay 是給 publish-returns 看的 message.getMessageProperties().setDelay((int) Math.max(delay, 0)); return message; }; }; private CorrelationData newCorrelationData() { return new CorrelationData(UUIDUtil.uuid32()); } /** * @param exchange 交換機(jī) * @param routingKey routing key * @param msg 消息 * @param delay 延遲時(shí)間(如果是延遲交換機(jī),delay 才有效) * @param maxRetries 最大重試機(jī)會(huì) * @param <T> 消息的對(duì)象類型 */ private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){ log.info("準(zhǔn)備發(fā)送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}", exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries); CorrelationData correlationData = newCorrelationData(); MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay); correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() { private int retryCount = 0; // 一次 send 從始至終都用的是一個(gè) Consumer 對(duì)象,所以作用的都是同一個(gè)計(jì)數(shù)器 @Override public void accept(CorrelationData.Confirm confirm) { Optional.ofNullable(confirm).ifPresent(c -> { if(c.isAck()) { log.info("ACK {} 消息成功到達(dá),{}", correlationData.getId(), c.getReason()); } else { log.warn("NACK {} 消息未能到達(dá),{}", correlationData.getId(), c.getReason()); if(retryCount >= maxRetries) { log.error("次數(shù)到達(dá)上限 {}", maxRetries); return; } retryCount++; log.warn("開始第 {} 次重試", retryCount); CorrelationData cd = newCorrelationData(); cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR); rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd); } }); } }, EXECUTOR); rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData); } public void sendMessage(String exchange, String routingKey, Object msg) { send(exchange, routingKey, msg, 0, 0); } public void sendDelayMessage(String exchange, String routingKey, Object msg, long delay){ send(exchange, routingKey, msg, delay, 0); } public void sendWithConfirm(String exchange, String routingKey, Object msg, int maxReties) { send(exchange, routingKey, msg, 0, maxReties); } public void sendDelayMessageWithConfirm(String exchange, String routingKey, Object msg, long delay, int maxReties) { send(exchange, routingKey, msg, delay, maxReties); } }
(2)接受消息
監(jiān)聽器:
- RabbitTemplate 是可以主動(dòng)獲取消息的,也可以不實(shí)時(shí)監(jiān)聽,但是一般情況都是監(jiān)聽,有消息就執(zhí)行
- 監(jiān)聽的是 queue,若 queue 不存在,就會(huì)根據(jù)注解創(chuàng)建一遍
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "xxx"), exchange = @Exchange(name = "xxx", delayed = "true"), key = {"xxx"} )) public void xxx(X x) { }
(3)聲明交換機(jī)與隊(duì)列
可以通過 @Bean 創(chuàng)建 Bean 對(duì)象的方式去聲明,可以自行搜索,我更喜歡監(jiān)聽器注解的形式,而且 Bean 的方式,可能會(huì)因?yàn)榕渲貌煌耆粯?,?dǎo)致其他配置類的交換機(jī)隊(duì)列無法聲明(現(xiàn)象如此,底層為啥我不知道)
(4)消息轉(zhuǎn)換器
消息是一個(gè)字符串,但為了滿足更多需求,需要將一個(gè)對(duì)象序列化成一個(gè)字符串,但默認(rèn)的序列化實(shí)現(xiàn)貌似用的是 java 對(duì)象的序列化,這種方式可能得同一個(gè)程序的 java 類才能反序列化成功,所以我們應(yīng)該選擇分布式的序列化方式,比如 json
@Configuration @RequiredArgsConstructor @Slf4j public class MessageConverterConfig { @Bean public MessageConverter messageConverter(){ // 1. 定義消息轉(zhuǎn)換器 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(JsonUtil.OBJECT_MAPPER); // 2. 配置自動(dòng)創(chuàng)建消息 id,用于識(shí)別不同消息 jackson2JsonMessageConverter.setCreateMessageIds(Boolean.TRUE); return jackson2JsonMessageConverter; } }
這里的 JsonUtil.OBJECT_MAPPER,就是框架的或者自己實(shí)現(xiàn)的 ObjectMapper
(5)配置文件
spring: rabbitmq: host: ${xxx.mq.host} # rabbitMQ 的 ip 地址 port: ${xxx.mq.port} # 端口 username: ${xxx.mq.username} password: ${xxx.mq.password} virtual-host: ${xxx.mq.virtual-host} publisher-confirm-type: correlated publisher-returns: true template: mandatory: true # 若是 false 則直接丟棄了,并不會(huì)發(fā)送者回執(zhí) listener: simple: prefetch: 1 # 預(yù)取為一個(gè)(消費(fèi)完才能拿下一個(gè)) concurrency: 2 # 消費(fèi)者最少 2 個(gè)線程 max-concurrency: 10 # 消費(fèi)者最多 10 個(gè)線程 auto-startup: true # 為 false 監(jiān)聽者不會(huì)實(shí)時(shí)創(chuàng)建和監(jiān)聽,為 true 監(jiān)聽的過程中,若 queue 不存在,會(huì)再根據(jù)注解進(jìn)行創(chuàng)建,創(chuàng)建后只監(jiān)聽 queue,declare = "false" 才是不自動(dòng)聲明 default-requeue-rejected: false # 拒絕后不 requeue(成為死信,若沒有綁定死信交換機(jī),就真的丟了) acknowledge-mode: auto # 消費(fèi)者執(zhí)行成功 ack、異常 nack(manual 為手動(dòng)、none 代表無論如何都是 ack) retry: # 這個(gè)屬于 spring amqp 的 retry 機(jī)制 enabled: false # 不開啟失敗重試 # initial-interval: 1000 # multiplier: 2 # max-attempts: 3 # stateless: true # true 代表沒有狀態(tài),若有消費(fèi)者包含事務(wù),這里改為 false
五、常見問題
(1)RabbitMQ 如何保證消息可靠性
保證消息可靠性、不丟失。主要從三個(gè)層面考慮
如果報(bào)錯(cuò)可以先記錄到日志中,再去修復(fù)數(shù)據(jù)(保底)
1、生產(chǎn)者確認(rèn)機(jī)制
生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
publisher-confirm,針對(duì)的是消息從發(fā)送者到交換機(jī)的可靠性,成功則進(jìn)行下一步,失敗返回 NACK
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread"); private final RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setTaskExecutor(EXECUTOR); } private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> { log.error("處理 ack 回執(zhí)失敗, {}", ex.getMessage()); return null; }; private MessagePostProcessor delayMessagePostProcessor(long delay) { return message -> { // 小于 0 也是立即執(zhí)行 // setDelay 才是給 RabbitMQ 看的,setReceivedDelay 是給 publish-returns 看的 message.getMessageProperties().setDelay((int) Math.max(delay, 0)); return message; }; }; private CorrelationData newCorrelationData() { return new CorrelationData(UUIDUtil.uuid32()); } /** * @param exchange 交換機(jī) * @param routingKey routing key * @param msg 消息 * @param delay 延遲時(shí)間(如果是延遲交換機(jī),delay 才有效) * @param maxRetries 最大重試機(jī)會(huì) * @param <T> 消息的對(duì)象類型 */ private <T> void send(String exchange, String routingKey, T msg, long delay, int maxRetries){ log.info("準(zhǔn)備發(fā)送消息,exchange: {}, routingKey: {}, msg: {}, delay: {}s, maxRetries: {}", exchange, routingKey, msg, TimeUnit.MILLISECONDS.toSeconds(delay), maxRetries); CorrelationData correlationData = newCorrelationData(); MessagePostProcessor delayMessagePostProcessor = delayMessagePostProcessor(delay); correlationData.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(new Consumer<>() { private int retryCount = 0; // 一次 send 從始至終都用的是一個(gè) Consumer 對(duì)象,所以作用的都是同一個(gè)計(jì)數(shù)器 @Override public void accept(CorrelationData.Confirm confirm) { Optional.ofNullable(confirm).ifPresent(c -> { if(c.isAck()) { log.info("ACK {} 消息成功到達(dá),{}", correlationData.getId(), c.getReason()); } else { log.warn("NACK {} 消息未能到達(dá),{}", correlationData.getId(), c.getReason()); if(retryCount >= maxRetries) { log.error("次數(shù)到達(dá)上限 {}", maxRetries); return; } retryCount++; log.warn("開始第 {} 次重試", retryCount); CorrelationData cd = newCorrelationData(); cd.getFuture().exceptionallyAsync(ON_FAILURE, EXECUTOR).thenAcceptAsync(this, EXECUTOR); rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, cd); } }); } }, EXECUTOR); rabbitTemplate.convertAndSend(exchange, routingKey, msg, delayMessagePostProcessor, correlationData); }
Spring3 的 RabbitMQ Confirm,需要配置為 correlated,發(fā)送消息時(shí)提供 CorrelationData,也就是與消息關(guān)聯(lián)的數(shù)據(jù),包括發(fā)送者確認(rèn)時(shí)的回調(diào)方法
要想提供 Confirm 的回調(diào)辦法,需要配置 correlationData.getFuture() 返回的 CompletableFuture 對(duì)象(新的 JUC 工具類,可以查一查如何使用)
配置后,在未來根據(jù)回調(diào)函數(shù)進(jìn)行處理(當(dāng)然也可以直接設(shè)置在 RabbitTemplate 對(duì)象的 ConfirmCallBack)
還可以自己實(shí)現(xiàn)消息的發(fā)送者重試:
publisher-returns,針對(duì)的是消息從交換機(jī)到隊(duì)列的可靠性,成功則返回 ACK,失敗觸發(fā) returns 的回調(diào)方法
@Component @RequiredArgsConstructor @Slf4j public class PublisherReturnsCallBack implements RabbitTemplate.ReturnsCallback { // 不存在 routing key 對(duì)應(yīng)的隊(duì)列,那在我看來轉(zhuǎn)發(fā)到零個(gè)是合理的現(xiàn)象,但在這里也認(rèn)為是路由失?。∕Q 認(rèn)為消息一定至少要進(jìn)入一個(gè)隊(duì)列,之后才能被處理,這就是可靠性)(反正就是回執(zhí)了,你愛咋處理是你自己的事情) @Override public void returnedMessage(ReturnedMessage returnedMessage) { // 可能一些版本的 mq 會(huì)因?yàn)槭茄訒r(shí)交換機(jī),導(dǎo)致發(fā)送者回執(zhí),只要沒有 NACK 這種情況其實(shí)并不是不可靠(其實(shí)我也不知道有沒有版本會(huì)忽略) // 但是其實(shí)不忽略也不錯(cuò),畢竟者本來就是特殊情況,一般交換機(jī)是不存儲(chǔ)的,但是這個(gè)臨時(shí)存儲(chǔ)消息 // 但這樣也就代表了,延時(shí)后消息路由失敗是沒法再次處理的(因?yàn)槲覀兘唤o延時(shí)交換機(jī)后就不管了,可靠性有 mq 自己保持) MessageProperties messageProperties = returnedMessage.getMessage().getMessageProperties(); // 這里的 message 并不是原本的 message,是額外的封裝,x-delay 在 publish-returns 里面封裝到 receiveDelay 里了 Integer delay = messageProperties.getReceivedDelay(); // 如果不是延時(shí)交換機(jī),卻設(shè)置了 delay 大于 0,是不會(huì)延時(shí)的,所以是其他原因?qū)е碌模ㄒ苑廊f一把消息記錄到日志里) if(Objects.nonNull(delay) && delay.compareTo(0) > 0) { log.info("交換機(jī) {}, 路由鍵 {} 消息 {} 延遲 {} s", returnedMessage.getExchange(), returnedMessage.getRoutingKey(), messageProperties, TimeUnit.MILLISECONDS.toSeconds(delay)); return; } log.warn("publisher-returns 發(fā)送者回執(zhí)(應(yīng)答碼{},應(yīng)答內(nèi)容{})(消息 {} 成功到達(dá)交換機(jī) {},但路由失敗,路由鍵為 {})", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getMessage(), returnedMessage.getExchange(), returnedMessage.getRoutingKey()); } }
RabbitMQSender:
private final static ThreadPoolExecutor EXECUTOR = ThreadPoolUtil.getIoTargetThreadPool("Rabbit-MQ-Thread"); private final RabbitTemplate rabbitTemplate; private final PublisherReturnsCallBack publisherReturnsCallBack; @PostConstruct public void init() { rabbitTemplate.setTaskExecutor(EXECUTOR); // 設(shè)置統(tǒng)一的 publisher-returns(confirm 也可以設(shè)置統(tǒng)一的,但最好還是在發(fā)送時(shí)設(shè)置在 future 里) // rabbitTemplate 的 publisher-returns 同一時(shí)間只能存在一個(gè) // 因?yàn)?publisher confirm 后,其實(shí) exchange 有沒有轉(zhuǎn)發(fā)成功,publisher 沒必要每次發(fā)送都關(guān)注這個(gè) exchange 的內(nèi)部職責(zé),更多的是“系統(tǒng)與 MQ 去約定” rabbitTemplate.setReturnsCallback(publisherReturnsCallBack); }
同理你也可以按照自己的想法進(jìn)行重試…
在測(cè)試練習(xí)階段里,這個(gè)過程是異步回調(diào)的,如果是單元測(cè)試,發(fā)送完消息進(jìn)程就結(jié)束了,可能就沒回調(diào),程序就結(jié)束了,自然就看不到回調(diào)時(shí)的日志
如果既沒有 ACK 也沒有 NACK,也沒有發(fā)布者回執(zhí),那就相當(dāng)于這個(gè)消息銷聲匿跡了,沒有任何的回應(yīng),那么就會(huì)拋出異常,我們可以處理這個(gè)異常,比如打印日志、重發(fā)之類的…
private final static Function<Throwable, ? extends CorrelationData.Confirm> ON_FAILURE = ex -> { log.error("處理 ack 回執(zhí)失敗, {}", ex.getMessage()); return null; };
2、持久化
消息隊(duì)列的數(shù)據(jù)持久化,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失,其中的交換機(jī)、隊(duì)列、和消息都要做持久化
默認(rèn)都是持久化的
3、消費(fèi)者確認(rèn)
隊(duì)列的消息出隊(duì)列,并不會(huì)立即刪除,而是等待消費(fèi)者返回 ACK 或者 NACK
消費(fèi)者要什么時(shí)候發(fā)送 ACK 呢?
- 1)RabbitMQ投遞消息給消費(fèi)者
- 2)消費(fèi)者獲取消息后,返回ACK給RabbitMQ
- 3)RabbitMQ刪除消息
- 4)消費(fèi)者宕機(jī),消息尚未處理
如果出現(xiàn)這種場(chǎng)景,就是不可靠的,所以應(yīng)該是消息處理后,再發(fā)送 ACK
Spring AMQP 有三種消費(fèi)者確認(rèn)模式:
- manual,手段 ack,自己用 rabbitTemplate 去發(fā)送 ACK/NACK(這個(gè)比較麻煩,不用 RabbitListener 接受消息才必須用這個(gè))
- auto,配合 RabbitListener 注解,代碼若出現(xiàn)異常,NACK,成功則 ACK
- none,獲得消息后直接 ACK,無論是否執(zhí)行成功
出現(xiàn) NACK 后要如何處理(此過程還在我們的服務(wù)器):
- 拒絕(默認(rèn))
- 重新入隊(duì)列
- 返回 ACK,消費(fèi)者重新發(fā)布消息指定的交換機(jī)
@Configuration @RequiredArgsConstructor @Slf4j public class MessageRecovererConfig { @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { return new RejectAndDontRequeueRecoverer(); // nack、直接 reject 和不 requeue,成為死信(默認(rèn)) // return new ImmediateRequeueMessageRecoverer(); // nack、requeue // return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); // ack、發(fā)送給指定的交換機(jī),confirm 機(jī)制需要設(shè)置到 rabbitTemplate 里 } }
Spring 提供的 retry 機(jī)制,在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無限制的requeue到mq隊(duì)列。
spring: rabbitmq: listener: simple: acknowledge-mode: auto # 消費(fèi)者執(zhí)行成功 ack、異常 nack(manual 為手動(dòng)、none 代表無論如何都是 ack) retry: # 這個(gè)屬于 spring amqp 的 retry 機(jī)制 enabled: false # 不開啟失敗重試 initial-interval: 1000 # 第一次重試時(shí)間間隔 multiplier: 3 # 每次重試間隔的倍數(shù) max-attempts: 4 # 最大接受次數(shù) stateless: true # true 代表沒有狀態(tài),若有消費(fèi)者包含事務(wù),這里改為 false
解釋:第一次失敗,一秒后重試、第二次失敗,三秒后重試,第三次失敗,九秒后重試,第四次失敗就沒機(jī)會(huì)了(SpringAMQP會(huì)拋出異常AmqpRejectAndDontRequeueException)
失敗之后根據(jù)對(duì)應(yīng)的處理策略進(jìn)行處理
(2)死信交換機(jī)
消息過期、消息執(zhí)行失敗并且不重試也不重新入隊(duì)列,堆積過多等情況,消息會(huì)成為死信,若隊(duì)列綁定死信交換機(jī),則轉(zhuǎn)發(fā)給死信交換機(jī),若沒有則直接丟棄
隊(duì)列1 -> 死信交換機(jī) -> 隊(duì)列2,這個(gè)過程是消息隊(duì)列內(nèi)部保證的可靠性,消息也沒有包含原發(fā)送者的信息,甚至連接已經(jīng)斷開了,所以沒有 publisher-confirm 也沒有 publisher-returns
這個(gè)機(jī)制和 republish 有點(diǎn)像,但是有本質(zhì)的區(qū)別,republish 是消費(fèi)者重發(fā),而這里是隊(duì)列將死信轉(zhuǎn)發(fā)給死信交換機(jī)
死信的情況:
- nack && requeue == false
- 超時(shí)未消費(fèi)
- 隊(duì)列滿了,由于隊(duì)列的特性,隊(duì)列頭會(huì)先成為死信
(3)延遲功能如何實(shí)現(xiàn)
剛才提到死信的誕生可能是超時(shí)未消費(fèi),那么其實(shí)這個(gè)點(diǎn)也可以簡(jiǎn)單的實(shí)現(xiàn)一個(gè)延遲隊(duì)列:
隊(duì)列為一個(gè)不被監(jiān)聽的專門用來延遲消息發(fā)送的緩沖帶,其死信交換機(jī)才是目標(biāo)交換機(jī),
message.getMessageProperties().setExpiration("1000");
設(shè)置的是過期時(shí)間,其本意并不是延遲,是可以實(shí)現(xiàn)延遲~
另外,隊(duì)列本身也能設(shè)置 ttl 過期時(shí)間,但并不是隊(duì)列的過期時(shí)間(顯然不合理,截止后無論啥都丟了,冤不冤啊,至少我想不到這種場(chǎng)景),而是隊(duì)列中的消息存活的最大時(shí)間,消息的過期時(shí)間和這個(gè)取一個(gè)最小值才是真實(shí)的過期時(shí)間
值得注意的是,雖然能實(shí)現(xiàn)延時(shí)消息的功能,但是
實(shí)現(xiàn)復(fù)雜延遲可能不準(zhǔn)確,因?yàn)殛?duì)列的特性,如果隊(duì)列頭未出隊(duì)列,哪怕其后者出現(xiàn)死信,也只能乖乖等前面的先出去之后才能前往死信交換機(jī)(例如消息的 ttl 分別為 9s、3s、1s,最終三個(gè)消息會(huì)被同時(shí)轉(zhuǎn)發(fā),因?yàn)?ldquo;最長(zhǎng)壽的”排在了前面)
這種方式的順序優(yōu)先級(jí)大于時(shí)間優(yōu)先級(jí)
而 RabbitMQ 也提供了一個(gè)插件,叫 DelayExchange 延時(shí)交換機(jī),專門用來實(shí)現(xiàn)延時(shí)功能
Scheduling Messages with RabbitMQ | RabbitMQ
請(qǐng)自行上網(wǎng)下載
延時(shí)交換機(jī)的聲明:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay" )) public void listenDelayMessage(String msg){ log.info("接收到delay.queue的延遲消息:{}", msg); }
延時(shí)消息的發(fā)送:
private MessagePostProcessor delayMessagePostProcessor(long delay) { return message -> { // 小于 0 也是立即執(zhí)行 message.getMessageProperties().setDelay((int) Math.max(delay, 0)); return message; }; };
這里設(shè)置的是 Delay,不是過期時(shí)間,哪怕超過了時(shí)間也不叫做死信
期間一直存在延時(shí)交換機(jī)的硬存里,延遲消息插件內(nèi)部會(huì)維護(hù)一個(gè)本地?cái)?shù)據(jù)庫表,同時(shí)使用 Elang Timers 功能實(shí)現(xiàn)計(jì)時(shí)。如果消息的延遲時(shí)間設(shè)置較長(zhǎng),可能會(huì)導(dǎo)致堆積的延遲消息非常多,會(huì)帶來較大的CPU開銷,同時(shí)延遲消息的時(shí)間會(huì)存在誤差。
(4)消息堆積如何解決
死信的成因還可能是堆疊過多
我在實(shí)際的開發(fā)中,沒遇到過這種情況,不過,如果發(fā)生了堆積的問題,解決方案也所有很多的
- 提高消費(fèi)者的消費(fèi)能力 ,可以使用多線程消費(fèi)任務(wù)
- 增加更多消費(fèi)者,提高消費(fèi)速度,使用工作隊(duì)列模式, 設(shè)置多個(gè)消費(fèi)者消費(fèi)消費(fèi)同一個(gè)隊(duì)列中的消息
- 擴(kuò)大隊(duì)列容積,提高堆積上限
但是,RabbitMQ 隊(duì)列占的是內(nèi)存,間接性的落盤,提高上限最終的結(jié)果很有可能就是反復(fù)落庫,特別不穩(wěn)定,且并沒有解決消息堆積過多的問題
我們可以使用 RabbitMQ 惰性隊(duì)列,惰性隊(duì)列的好處主要是
- 接收到消息后直接存入磁盤而非內(nèi)存,雖然慢,但沒有間歇性的 page-out,性能比較穩(wěn)定
- 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存,正常消費(fèi)后就刪除了
- 基于磁盤存儲(chǔ),消息上限高,支持?jǐn)?shù)百萬條的消息存儲(chǔ)
聲明方式:
而要設(shè)置一個(gè)隊(duì)列為惰性隊(duì)列,只需要在聲明隊(duì)列時(shí),指定x-queue-mode屬性為lazy即可。可以通過命令行將一個(gè)運(yùn)行中的隊(duì)列修改為惰性隊(duì)列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解讀:
- rabbitmqctl :RabbitMQ的命令行工具
- set_policy :添加一個(gè)策略
- Lazy :策略名稱,可以自定義
- "^lazy-queue$" :用正則表達(dá)式匹配隊(duì)列的名字
- '{"queue-mode":"lazy"}' :設(shè)置隊(duì)列模式為lazy模式
- --apply-to queues :策略的作用對(duì)象,是所有的隊(duì)列
x-queue-mode 參數(shù)的值為 lazy
@RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = "xxx"), value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-mode", value = "lazy")), key = "xxx" ))
交換機(jī)、隊(duì)列擴(kuò)展屬性叫參數(shù),消息的拓展屬性叫頭部,擴(kuò)展屬性一般都以 x- 開頭(extra)
消息堆積問題的解決方案?
- 隊(duì)列上綁定多個(gè)消費(fèi)者,提高消費(fèi)速度
- 使用惰性隊(duì)列,可以再mq中保存更多消息
惰性隊(duì)列的優(yōu)點(diǎn)有哪些?
- 基于磁盤存儲(chǔ),消息上限高
- 沒有間歇性的 page-out,性能比較穩(wěn)定
惰性隊(duì)列的缺點(diǎn)有哪些?
- 基于磁盤存儲(chǔ),消息時(shí)效性會(huì)降低
- 性能受限于磁盤的IO
(5)高可用如何保證
RabbitMQ 在服務(wù)大規(guī)模項(xiàng)目時(shí),一般情況下不會(huì)像數(shù)據(jù)庫那樣存儲(chǔ)的瓶頸,用惰性隊(duì)列已經(jīng)是很頂天了的,其特性和用途不會(huì)有太極端的存儲(chǔ)壓力
更多的是在并發(fā)情況下,處理消息的能力有瓶頸,可能出現(xiàn)節(jié)點(diǎn)宕機(jī)的情況,而避免單節(jié)點(diǎn)宕機(jī),數(shù)據(jù)丟失、無法提供服務(wù)等問題需要解決,也就是需要保證高可用性
Erlang 是一種面向并發(fā)的語言,天然支持集群模式,RabbitMQ 的集群有兩種模式:
- 普通集群:是一種分布式集群,將隊(duì)列分散到集群的各個(gè)節(jié)點(diǎn),從而提高整個(gè)集群的并發(fā)能力
- 鏡像集群:是一種主從集群,在普通集群的基礎(chǔ)上,添加了主從備份的功能,提高集群的數(shù)據(jù)可用性
鏡像集群雖然支持主從,但主從同步并不是強(qiáng)一致的,某些情況下可能有數(shù)據(jù)丟失的風(fēng)險(xiǎn)(雖然重啟能解決,但那不是強(qiáng)一致,而是最終一致),因此 RabbitMQ 3.8 以后,推出了新的功能:仲裁隊(duì)列來代替鏡像集群,底層采用 Raft 協(xié)議確保主從的數(shù)據(jù)一致性
1、普通集群
- 各個(gè)節(jié)點(diǎn)之間,實(shí)時(shí)同步 MQ 元數(shù)據(jù)(一些靜態(tài)的共享的數(shù)據(jù)):
- 交換機(jī)的信息隊(duì)列的信息
但不包括隊(duì)列中的消息(動(dòng)態(tài)的數(shù)據(jù)不同步)
監(jiān)聽隊(duì)列的時(shí)候,如果監(jiān)聽的節(jié)點(diǎn)不存在該隊(duì)列(只是知道元數(shù)據(jù)),當(dāng)前節(jié)點(diǎn)會(huì)訪問隊(duì)列所在的節(jié)點(diǎn),該節(jié)點(diǎn)返回?cái)?shù)據(jù)到當(dāng)前節(jié)點(diǎn)并返回給監(jiān)聽者
隊(duì)列所在節(jié)點(diǎn)宕機(jī),隊(duì)列中的消息就會(huì)“丟失”(是在重啟之前,這個(gè)消息就消失無法被處理的意思)
如何部署,上網(wǎng)搜搜就行
2、鏡像集群
各個(gè)節(jié)點(diǎn)之間,實(shí)時(shí)同步 MQ 元數(shù)據(jù)(一些靜態(tài)的共享的數(shù)據(jù)):
- 交換機(jī)的信息
- 隊(duì)列的信息
本質(zhì)是主從模式,創(chuàng)建隊(duì)列的節(jié)點(diǎn)為主節(jié)點(diǎn),其他節(jié)點(diǎn)為鏡像節(jié)點(diǎn),隊(duì)列中的消息會(huì)從主節(jié)點(diǎn)備份到鏡像節(jié)點(diǎn)中
注意
- 像 Redis 那樣的主從集群,同步都是全部同步來著
- 但 RabbitMQ 集群的主從模式比較特別,他的粒度是隊(duì)列,而不是全部
也就是說,一個(gè)隊(duì)列的主節(jié)點(diǎn),可能是另一個(gè)隊(duì)列的鏡像節(jié)點(diǎn),所以分析某個(gè)場(chǎng)景的時(shí)候,要確認(rèn)是哪個(gè)隊(duì)列,單獨(dú)進(jìn)行觀察分析討論
- 不同隊(duì)列之間只有交互,不會(huì)相互影響數(shù)據(jù)同步
針對(duì)某一個(gè)隊(duì)列,所有寫操作都在主節(jié)點(diǎn)完成,然后同步給鏡像節(jié)點(diǎn),讀操作任何一個(gè)都 ok
主節(jié)點(diǎn)宕機(jī),鏡像節(jié)成為新的主節(jié)點(diǎn)
鏡像集群有三種模式:
- exactly 準(zhǔn)確模式,指定副本數(shù) count = 主節(jié)點(diǎn)數(shù) 1 + 鏡像節(jié)點(diǎn)數(shù),集群會(huì)盡可能的維護(hù)這個(gè)數(shù)值,如果鏡像節(jié)點(diǎn)出現(xiàn)故障,就在另一個(gè)節(jié)點(diǎn)上創(chuàng)建鏡像,比較建議這種模式,可以設(shè)置為 N/2 + 1
- all 全部模式,count = N,主節(jié)點(diǎn)外全部都是鏡像節(jié)點(diǎn)
- nodes 模式,指定鏡像節(jié)點(diǎn)名稱列表,隨機(jī)一個(gè)作為主節(jié)點(diǎn),如果列表里的節(jié)點(diǎn)都不存在或不可用,則創(chuàng)建隊(duì)列時(shí)的節(jié)點(diǎn)作為主節(jié)點(diǎn),之后訪問集群,列表中的節(jié)點(diǎn)若存在才會(huì)創(chuàng)建鏡像節(jié)點(diǎn)
沒有鏡像節(jié)點(diǎn)其實(shí)就相當(dāng)于普通模式了
如何配置上網(wǎng)搜搜就行,比較麻煩,需要設(shè)置策略,以及匹配的隊(duì)列(不同隊(duì)列分開來討論,可以設(shè)置不同的策略)
3、仲裁隊(duì)列
RabbitMQ 3.8 以后,推出了新的功能仲裁隊(duì)列來
- 代替鏡像集群,都是主從模式,支持主從數(shù)據(jù)同步,默認(rèn)是 exactly count = 5
- 約定大于配置,使用非常簡(jiǎn)單沒有復(fù)雜的配置,隊(duì)列的類型選擇 Quorum 即可
- 底層采用 Raft 協(xié)議確保主從的數(shù)據(jù)強(qiáng)一致性
Spring Boot 配置:
仲裁隊(duì)列聲明:
@RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = "xxx"), value = @Queue(name = "xxx", arguments = @Argument(name = "x-queue-type", value = "quorum")), key = "xxx" ))
隊(duì)列不聲明默認(rèn)就是普通集群,這里聲明的仲裁隊(duì)列也只是針對(duì)一個(gè)隊(duì)列
(6)消息重復(fù)消費(fèi)問題
在保證MQ消息不重復(fù)的情況下,MQ 的一條消息被消費(fèi)者消費(fèi)了多次
消費(fèi)者消費(fèi)消息成功后,在給MQ發(fā)送消息確認(rèn)的時(shí)候出現(xiàn)了網(wǎng)絡(luò)異?;蛘呤欠?wù)宕機(jī),MQ 遲遲沒有接收到 ACK 也沒有 NACK,此時(shí) MQ 不會(huì)將發(fā)送的消息刪除,按兵不動(dòng),消費(fèi)者重新監(jiān)聽或者有其他消費(fèi)者的時(shí)候,交由它消費(fèi),而這條消息如果在之前就消費(fèi)過了的話,則會(huì)導(dǎo)致重復(fù)消費(fèi)
解決方案:
- 消息消費(fèi)的業(yè)務(wù)本身具有冪等性,再次處理相同消息時(shí)不會(huì)產(chǎn)生副作用,一些時(shí)候可能需要用到分布式鎖去維護(hù)冪等性
- 比如一個(gè)訂單的狀態(tài)設(shè)置為結(jié)束,那重復(fù)消費(fèi)的結(jié)果一致
- 記錄消息的唯一標(biāo)識(shí),如果消費(fèi)過了的,則不再消費(fèi)
- 消費(fèi)成功將 id 緩存起來,消費(fèi)時(shí)查詢緩存里是否有這條消息
- 設(shè)置允許的緩存時(shí)間時(shí),你不必想得太極端,一般很快就有消費(fèi)者繼續(xù)監(jiān)聽拿到消息,哪怕真有那個(gè)情況,這里帶來的損失大概率可以忽略不記了,一切要結(jié)合實(shí)際情況!
有時(shí)候兩種方案沒有嚴(yán)格的界定
到此這篇關(guān)于Spring3 中 RabbitMQ 的使用與常見場(chǎng)景的文章就介紹到這了,更多相關(guān)Spring3 RabbitMQ 使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot 整合 RabbitMQ 的使用方式(代碼示例)
- RabbitMQ在Spring Boot中的使用步驟
- Springboot RabbitMQ 消息隊(duì)列使用示例詳解
- Spring Boot中RabbitMQ自動(dòng)配置的介紹、原理和使用方法
- 詳解SpringBoot中使用RabbitMQ的RPC功能
- SpringMVC和rabbitmq集成的使用案例
- SpringBoot+RabbitMq具體使用的幾種姿勢(shì)
- 詳解Spring Cloud Stream使用延遲消息實(shí)現(xiàn)定時(shí)任務(wù)(RabbitMQ)
- SpringBoot之RabbitMQ的使用方法
- spring boot使用RabbitMQ實(shí)現(xiàn)topic 主題
相關(guān)文章
SpringCloud遠(yuǎn)程服務(wù)調(diào)用實(shí)戰(zhàn)筆記
本文給大家介紹SpringCloud遠(yuǎn)程服務(wù)調(diào)用實(shí)戰(zhàn)筆記,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-11-11SpringBoot整合MongoDB實(shí)現(xiàn)文檔存儲(chǔ)功能
MongoDB是可以應(yīng)用于各種規(guī)模的企業(yè)、各個(gè)行業(yè)以及各類應(yīng)用程序的開源數(shù)據(jù)庫,本文將結(jié)合MongoDB和SpringBoot實(shí)現(xiàn)文檔存儲(chǔ)功能,需要的可以參考下2024-12-12如何在JDK 9中更簡(jiǎn)潔使用 try-with-resources 語句
本文詳細(xì)介紹了自 JDK 7 引入的 try-with-resources 語句的原理和用法,以及介紹了 JDK 9 對(duì) try-with-resources 的改進(jìn),使得用戶可以更加方便、簡(jiǎn)潔的使用 try-with-resources 語句。,需要的朋友可以參考下2019-06-06JAVA PDF操作之實(shí)現(xiàn)截取N頁和多個(gè)PDF合并
這篇文章主要為大家詳細(xì)介紹了java關(guān)于PDF的一些操作,例如截取N頁并生成新文件,轉(zhuǎn)圖片以及多個(gè)PDF合并,文中的示例代碼講解詳細(xì),感興趣的可以了解下2025-01-01關(guān)于@JsonProperty,@NotNull,@JsonIgnore的具體使用
這篇文章主要介紹了關(guān)于@JsonProperty,@NotNull,@JsonIgnore的具體使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-08-08java單元測(cè)試JUnit框架原理與用法實(shí)例教程
這篇文章主要介紹了java單元測(cè)試JUnit框架原理與用法,結(jié)合實(shí)例形式較為詳細(xì)的分析了java單元測(cè)試JUnit框架的概念、原理、使用方法及相關(guān)注意事項(xiàng),需要的朋友可以參考下2017-11-11