亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

spring boot Rabbit高級(jí)教程(最新推薦)

 更新時(shí)間:2023年10月18日 12:03:18   作者:過(guò)去日記  
RabbitMQ的消息過(guò)期是基于追溯方式來(lái)實(shí)現(xiàn)的,也就是說(shuō)當(dāng)一個(gè)消息的TTL到期以后不一定會(huì)被移除或投遞到死信交換機(jī),而是在消息恰好處于隊(duì)首時(shí)才會(huì)被處理,本篇文章給大家介紹spring boot Rabbit高級(jí)教程,感興趣的朋友一起看看吧

消息可靠性

生產(chǎn)者重試機(jī)制

首先第一種情況,就是生產(chǎn)者發(fā)送消息時(shí),出現(xiàn)了網(wǎng)絡(luò)故障,導(dǎo)致與MQ的連接中斷。

為了解決這個(gè)問(wèn)題,SpringAMQP提供的消息發(fā)送時(shí)的重試機(jī)制。即:當(dāng)RabbitTemplate與MQ連接超時(shí)后,多次重試。

修改publisher模塊的application.yaml文件,添加下面的內(nèi)容:

spring:
  rabbitmq:
    connection-timeout: 1s # 設(shè)置MQ的連接超時(shí)時(shí)間
    template:
      retry:
        enabled: true # 開啟超時(shí)重試機(jī)制
        initial-interval: 1000ms # 失敗后的初始等待時(shí)間
        multiplier: 1 # 失敗后下次的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = initial-interval * multiplier
        max-attempts: 3 # 最大重試次數(shù)

注意:當(dāng)網(wǎng)絡(luò)不穩(wěn)定的時(shí)候,利用重試機(jī)制可以有效提高消息發(fā)送的成功率。不過(guò)SpringAMQP提供的重試機(jī)制是阻塞式的重試,也就是說(shuō)多次重試等待的過(guò)程中,當(dāng)前線程是被阻塞的。
如果對(duì)于業(yè)務(wù)性能有要求,建議禁用重試機(jī)制。如果一定要使用,請(qǐng)合理配置等待時(shí)長(zhǎng)和重試次數(shù),當(dāng)然也可以考慮使用異步線程來(lái)執(zhí)行發(fā)送消息的代碼。
:::

生產(chǎn)者確認(rèn)機(jī)制

一般情況下,只要生產(chǎn)者與MQ之間的網(wǎng)路連接順暢,基本不會(huì)出現(xiàn)發(fā)送消息丟失的情況,因此大多數(shù)情況下我們無(wú)需考慮這種問(wèn)題。
不過(guò),在少數(shù)情況下,也會(huì)出現(xiàn)消息發(fā)送到MQ之后丟失的現(xiàn)象,比如:

  • MQ內(nèi)部處理消息的進(jìn)程發(fā)生了異常
  • 生產(chǎn)者發(fā)送消息到達(dá)MQ后未找到Exchange
  • 生產(chǎn)者發(fā)送消息到達(dá)MQ的Exchange后,未找到合適的Queue,因此無(wú)法路由

針對(duì)上述情況,RabbitMQ提供了生產(chǎn)者消息確認(rèn)機(jī)制,包括Publisher ConfirmPublisher Return兩種。在開啟確認(rèn)機(jī)制的情況下,當(dāng)生產(chǎn)者發(fā)送消息給MQ后,MQ會(huì)根據(jù)消息處理的情況返回不同的回執(zhí)。

  • 當(dāng)消息投遞到MQ,但是路由失敗時(shí),通過(guò)Publisher Return返回異常信息,同時(shí)返回ack的確認(rèn)信息,代表投遞成功
  • 臨時(shí)消息投遞到了MQ,并且入隊(duì)成功,返回ACK,告知投遞成功
  • 持久消息投遞到了MQ,并且入隊(duì)完成持久化,返回ACK ,告知投遞成功
  • 其它情況都會(huì)返回NACK,告知投遞失敗

其中acknack屬于Publisher Confirm機(jī)制,ack是投遞成功;nack是投遞失敗。而return則屬于Publisher Return機(jī)制。
默認(rèn)兩種機(jī)制都是關(guān)閉狀態(tài),需要通過(guò)配置文件來(lái)開啟。

在publisher模塊的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 開啟publisher confirm機(jī)制,并設(shè)置confirm類型
    publisher-returns: true # 開啟publisher return機(jī)制

這里publisher-confirm-type有三種模式可選:

  • none:關(guān)閉confirm機(jī)制
  • simple:同步阻塞等待MQ的回執(zhí)
  • correlated:MQ異步回調(diào)返回回執(zhí)

一般我們推薦使用correlated,回調(diào)機(jī)制。

定義ReturnCallback

每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此我們可以在配置類中統(tǒng)一設(shè)置。我們?cè)趐ublisher模塊定義一個(gè)配置類:

內(nèi)容如下:

package com.itheima.publisher.config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("觸發(fā)return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}

定義ConfirmCallback

由于每個(gè)消息發(fā)送時(shí)的處理邏輯不一定相同,因此ConfirmCallback需要在每次發(fā)消息時(shí)定義。具體來(lái)說(shuō),是在調(diào)用RabbitTemplate中的convertAndSend方法時(shí),多傳遞一個(gè)參數(shù):

這里的CorrelationData中包含兩個(gè)核心的東西:

  • id:消息的唯一標(biāo)示,MQ對(duì)不同的消息的回執(zhí)以此做判斷,避免混淆
  • SettableListenableFuture:回執(zhí)結(jié)果的Future對(duì)象

將來(lái)MQ的回執(zhí)就會(huì)通過(guò)這個(gè)Future來(lái)返回,我們可以提前給CorrelationData中的Future添加回調(diào)函數(shù)來(lái)處理消息回執(zhí):

我們新建一個(gè)測(cè)試,向系統(tǒng)自帶的交換機(jī)發(fā)送消息,并且添加ConfirmCallback

@Test
void testPublisherConfirm() {
    // 1.創(chuàng)建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.給Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future發(fā)生異常時(shí)的處理邏輯,基本不會(huì)觸發(fā)
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回執(zhí)的處理邏輯,參數(shù)中的result就是回執(zhí)內(nèi)容
            if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執(zhí),false 代表 nack回執(zhí)
                log.debug("發(fā)送消息成功,收到 ack!");
            }else{ // result.getReason(),String類型,返回nack時(shí)的異常描述
                log.error("發(fā)送消息失敗,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.發(fā)送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

執(zhí)行結(jié)果如下:

可以看到,由于傳遞的RoutingKey是錯(cuò)誤的,路由失敗后,觸發(fā)了return callback,同時(shí)也收到了ack。
當(dāng)我們修改為正確的RoutingKey以后,就不會(huì)觸發(fā)return callback了,只收到ack。
而如果連交換機(jī)都是錯(cuò)誤的,則只會(huì)收到nack。

注意
開啟生產(chǎn)者確認(rèn)比較消耗MQ性能,一般不建議開啟。而且大家思考一下觸發(fā)確認(rèn)的幾種情況:

  • 路由失敗:一般是因?yàn)镽outingKey錯(cuò)誤導(dǎo)致,往往是編程導(dǎo)致
  • 交換機(jī)名稱錯(cuò)誤:同樣是編程錯(cuò)誤導(dǎo)致
  • MQ內(nèi)部故障:這種需要處理,但概率往往較低。因此只有對(duì)消息可靠性要求非常高的業(yè)務(wù)才需要開啟,而且僅僅需要開啟ConfirmCallback處理nack就可以了。

數(shù)據(jù)持久化

為了提升性能,默認(rèn)情況下MQ的數(shù)據(jù)都是在內(nèi)存存儲(chǔ)的臨時(shí)數(shù)據(jù),重啟后就會(huì)消失。為了保證數(shù)據(jù)的可靠性,必須配置數(shù)據(jù)持久化,包括:

  • 交換機(jī)持久化
  • 隊(duì)列持久化
  • 消息持久化

我們以控制臺(tái)界面為例來(lái)說(shuō)明。

交換機(jī)持久化

在控制臺(tái)的Exchanges頁(yè)面,添加交換機(jī)時(shí)可以配置交換機(jī)的Durability參數(shù):

設(shè)置為Durable就是持久化模式,Transient就是臨時(shí)模式。

隊(duì)列持久化

在控制臺(tái)的Queues頁(yè)面,添加隊(duì)列時(shí),同樣可以配置隊(duì)列的Durability參數(shù):

除了持久化以外。

消息持久化

在控制臺(tái)發(fā)送消息的時(shí)候,可以添加很多參數(shù),而消息的持久化是要配置一個(gè)properties

說(shuō)明:在開啟持久化機(jī)制以后,如果同時(shí)還開啟了生產(chǎn)者確認(rèn),那么MQ會(huì)在消息持久化以后才發(fā)送ACK回執(zhí),進(jìn)一步確保消息的可靠性。
不過(guò)出于性能考慮,為了減少IO次數(shù),發(fā)送到MQ的消息并不是逐條持久化到數(shù)據(jù)庫(kù)的,而是每隔一段時(shí)間批量持久化。一般間隔在100毫秒左右,這就會(huì)導(dǎo)致ACK有一定的延遲,因此建議生產(chǎn)者確認(rèn)全部采用異步方式。

LazyQueue

在默認(rèn)情況下,RabbitMQ會(huì)將接收到的信息保存在內(nèi)存中以降低消息收發(fā)的延遲。但在某些特殊情況下,這會(huì)導(dǎo)致消息積壓,比如:

  • 消費(fèi)者宕機(jī)或出現(xiàn)網(wǎng)絡(luò)故障
  • 消息發(fā)送量激增,超過(guò)了消費(fèi)者處理速度
  • 消費(fèi)者處理業(yè)務(wù)發(fā)生阻塞

一旦出現(xiàn)消息堆積問(wèn)題,RabbitMQ的內(nèi)存占用就會(huì)越來(lái)越高,直到觸發(fā)內(nèi)存預(yù)警上限。此時(shí)RabbitMQ會(huì)將內(nèi)存消息刷到磁盤上,這個(gè)行為成為PageOut. PageOut會(huì)耗費(fèi)一段時(shí)間,并且會(huì)阻塞隊(duì)列進(jìn)程。因此在這個(gè)過(guò)程中RabbitMQ不會(huì)再處理新的消息,生產(chǎn)者的所有請(qǐng)求都會(huì)被阻塞。

為了解決這個(gè)問(wèn)題,從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的模式,也就是惰性隊(duì)列。惰性隊(duì)列的特征如下:

  • 接收到消息后直接存入磁盤而非內(nèi)存
  • 消費(fèi)者要消費(fèi)消息時(shí)才會(huì)從磁盤中讀取并加載到內(nèi)存(也就是懶加載)
  • 支持?jǐn)?shù)百萬(wàn)條的消息存儲(chǔ)

而在3.12版本之后,LazyQueue已經(jīng)成為所有隊(duì)列的默認(rèn)格式。因此官方推薦升級(jí)MQ為3.12版本或者所有隊(duì)列都設(shè)置為L(zhǎng)azyQueue模式。

控制臺(tái)配置Lazy模式

在添加隊(duì)列的時(shí)候,添加x-queue-mod=lazy參數(shù)即可設(shè)置隊(duì)列為L(zhǎng)azy模式:

代碼配置Lazy模式

在利用SpringAMQP聲明隊(duì)列的時(shí)候,添加x-queue-mod=lazy參數(shù)也可設(shè)置隊(duì)列為L(zhǎng)azy模式:

@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 開啟Lazy模式
            .build();
}

這里是通過(guò)QueueBuilderlazy()函數(shù)配置Lazy模式。
當(dāng)然,我們也可以基于注解來(lái)聲明隊(duì)列并設(shè)置為L(zhǎng)azy模式:

@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}

更新已有隊(duì)列為lazy模式

對(duì)于已經(jīng)存在的隊(duì)列,也可以配置為lazy模式,但是要通過(guò)設(shè)置policy實(shí)現(xiàn)。
可以基于命令行設(shè)置policy:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解讀:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一個(gè)策略
  • Lazy :策略名稱,可以自定義
  • "^lazy-queue$" :用正則表達(dá)式匹配隊(duì)列的名字
  • '{"queue-mode":"lazy"}' :設(shè)置隊(duì)列模式為lazy模式
  • --apply-to queues:策略的作用對(duì)象,是所有的隊(duì)列

當(dāng)然,也可以在控制臺(tái)配置policy,進(jìn)入在控制臺(tái)的Admin頁(yè)面,點(diǎn)擊Policies,即可添加配置:

消費(fèi)者的可靠性

當(dāng)RabbitMQ向消費(fèi)者投遞消息以后,需要知道消費(fèi)者的處理狀態(tài)如何。因?yàn)橄⑼哆f給消費(fèi)者并不代表就一定被正確消費(fèi)了,可能出現(xiàn)的故障有很多,比如:

  • 消息投遞的過(guò)程中出現(xiàn)了網(wǎng)絡(luò)故障
  • 消費(fèi)者接收到消息后突然宕機(jī)
  • 消費(fèi)者接收到消息后,因處理不當(dāng)導(dǎo)致異常

一旦發(fā)生上述情況,消息也會(huì)丟失。因此,RabbitMQ必須知道消費(fèi)者的處理狀態(tài),一旦消息處理失敗才能重新投遞消息。
但問(wèn)題來(lái)了:RabbitMQ如何得知消費(fèi)者的處理狀態(tài)呢?

本章我們就一起研究一下消費(fèi)者處理消息時(shí)的可靠性解決方案。

消費(fèi)者確認(rèn)機(jī)制

為了確認(rèn)消費(fèi)者是否成功處理消息,RabbitMQ提供了消費(fèi)者確認(rèn)機(jī)制(Consumer Acknowledgement)。即:當(dāng)消費(fèi)者處理消息結(jié)束后,應(yīng)該向RabbitMQ發(fā)送一個(gè)回執(zhí),告知RabbitMQ自己消息處理狀態(tài)?;貓?zhí)有三種可選值:

  • ack:成功處理消息,RabbitMQ從隊(duì)列中刪除該消息
  • nack:消息處理失敗,RabbitMQ需要再次投遞消息
  • reject:消息處理失敗并拒絕該消息,RabbitMQ從隊(duì)列中刪除該消息

一般reject方式用的較少,除非是消息格式有問(wèn)題,那就是開發(fā)問(wèn)題了。因此大多數(shù)情況下我們需要將消息處理的代碼通過(guò)try catch機(jī)制捕獲,消息處理成功時(shí)返回ack,處理失敗時(shí)返回nack.

由于消息回執(zhí)的處理代碼比較統(tǒng)一,因此SpringAMQP幫我們實(shí)現(xiàn)了消息確認(rèn)。并允許我們通過(guò)配置文件設(shè)置ACK處理方式,有三種模式:

**none**:不處理。即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用
**manual**:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ack或reject,存在業(yè)務(wù)入侵,但更靈活
**auto**:自動(dòng)模式。SpringAMQP利用AOP對(duì)我們的消息處理邏輯做了環(huán)繞增強(qiáng),當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack. 當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack;
如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject;
————————————————
版權(quán)聲明:本文為CSDN博主「過(guò)去日記」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/studycodeday/article/details/133839942

  • **none**:不處理。即消息投遞給消費(fèi)者后立刻ack,消息會(huì)立刻從MQ刪除。非常不安全,不建議使用
  • **manual**:手動(dòng)模式。需要自己在業(yè)務(wù)代碼中調(diào)用api,發(fā)送ackreject,存在業(yè)務(wù)入侵,但更靈活
  • **auto**:自動(dòng)模式。SpringAMQP利用AOP對(duì)我們的消息處理邏輯做了環(huán)繞增強(qiáng),當(dāng)業(yè)務(wù)正常執(zhí)行時(shí)則自動(dòng)返回ack. 當(dāng)業(yè)務(wù)出現(xiàn)異常時(shí),根據(jù)異常判斷返回不同結(jié)果:
    • 如果是業(yè)務(wù)異常,會(huì)自動(dòng)返回nack;
    • 如果是消息處理或校驗(yàn)異常,自動(dòng)返回reject;

通過(guò)下面的配置可以修改SpringAMQP的ACK處理方式:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做處理

修改consumer服務(wù)的SpringRabbitListener類中的方法,模擬一個(gè)消息處理的異常:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");
    if (true) {
        throw new MessageConversionException("故意的");
    }
    log.info("消息處理完成");
}

測(cè)試可以發(fā)現(xiàn):當(dāng)消息處理發(fā)生異常時(shí),消息依然被RabbitMQ刪除了。

我們?cè)俅伟汛_認(rèn)機(jī)制修改為auto:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自動(dòng)ack

在異常位置打斷點(diǎn),再次發(fā)送消息,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unacked(未確定狀態(tài)):

放行以后,由于拋出的是消息轉(zhuǎn)換異常,因此Spring會(huì)自動(dòng)返回reject,所以消息依然會(huì)被刪除

我們將異常改為RuntimeException類型:

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消費(fèi)者接收到消息:【" + msg + "】");
    if (true) {
        throw new RuntimeException("故意的");
    }
    log.info("消息處理完成");
}

在異常位置打斷點(diǎn),然后再次發(fā)送消息測(cè)試,程序卡在斷點(diǎn)時(shí),可以發(fā)現(xiàn)此時(shí)消息狀態(tài)為unacked(未確定狀態(tài))以后,由于拋出的是業(yè)務(wù)異常,所以Spring返回ack,最終消息恢復(fù)至Ready狀態(tài),并且沒(méi)有被RabbitMQ刪除
當(dāng)我們把配置改為auto時(shí),消息處理失敗后,會(huì)回到RabbitMQ,并重新投遞到消費(fèi)者。

失敗重試機(jī)制

當(dāng)消費(fèi)者出現(xiàn)異常后,消息會(huì)不斷requeue(重入隊(duì))到隊(duì)列,再重新發(fā)送給消費(fèi)者。如果消費(fèi)者再次執(zhí)行依然出錯(cuò),消息會(huì)再次requeue到隊(duì)列,再次投遞,直到消息處理成功為止。
極端情況就是消費(fèi)者一直無(wú)法執(zhí)行成功,那么消息requeue就會(huì)無(wú)限循環(huán),導(dǎo)致mq的消息處理飆升,帶來(lái)不必要的壓力:

image.png

當(dāng)然,上述極端情況發(fā)生的概率還是非常低的,不過(guò)不怕一萬(wàn)就怕萬(wàn)一。為了應(yīng)對(duì)上述情況Spring又提供了消費(fèi)者失敗重試機(jī)制:在消費(fèi)者出現(xiàn)異常時(shí)利用本地重試,而不是無(wú)限制的requeue到mq隊(duì)列。

修改consumer服務(wù)的application.yml文件,添加內(nèi)容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 開啟消費(fèi)者失敗重試
          initial-interval: 1000ms # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒
          multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval
          max-attempts: 3 # 最大重試次數(shù)
          stateless: true # true無(wú)狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false

重啟consumer服務(wù),重復(fù)之前的測(cè)試??梢园l(fā)現(xiàn):

  • 消費(fèi)者在失敗后消息沒(méi)有重新回到MQ無(wú)限重新投遞,而是在本地重試了3次
  • 本地重試3次以后,拋出了AmqpRejectAndDontRequeueException異常。查看RabbitMQ控制臺(tái),發(fā)現(xiàn)消息被刪除了,說(shuō)明最后SpringAMQP返回的是reject

結(jié)論:

  • 開啟本地重試時(shí),消息處理過(guò)程中拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
  • 重試達(dá)到最大次數(shù)后,Spring會(huì)返回reject,消息會(huì)被丟棄

失敗處理策略

在之前的測(cè)試中,本地測(cè)試達(dá)到最大重試次數(shù)后,消息會(huì)被丟棄。這在某些對(duì)于消息可靠性要求較高的業(yè)務(wù)場(chǎng)景下,顯然不太合適了。
因此Spring允許我們自定義重試次數(shù)耗盡后的消息處理策略,這個(gè)策略是由MessageRecovery接口來(lái)定義的,它有3個(gè)不同實(shí)現(xiàn):

  • RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
  • ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)
  • RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)

比較優(yōu)雅的一種處理方案是RepublishMessageRecoverer,失敗后將消息投遞到一個(gè)指定的,專門存放異常消息的隊(duì)列,后續(xù)由人工集中處理。

1)在consumer服務(wù)中定義處理失敗消息的交換機(jī)和隊(duì)列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定義一個(gè)RepublishMessageRecoverer,關(guān)聯(lián)隊(duì)列和交換機(jī)

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

完整代碼如下:

package com.itheima.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

業(yè)務(wù)冪等性

冪等是一個(gè)數(shù)學(xué)概念,用函數(shù)表達(dá)式來(lái)描述是這樣的:f(x) = f(f(x)),例如求絕對(duì)值函數(shù)。
在程序開發(fā)中,則是指同一個(gè)業(yè)務(wù),執(zhí)行一次或多次對(duì)業(yè)務(wù)狀態(tài)的影響是一致的。例如:

  • 根據(jù)id刪除數(shù)據(jù)
  • 查詢數(shù)據(jù)
  • 新增數(shù)據(jù)

但數(shù)據(jù)的更新往往不是冪等的,如果重復(fù)執(zhí)行可能造成不一樣的后果。比如:

  • 取消訂單,恢復(fù)庫(kù)存的業(yè)務(wù)。如果多次恢復(fù)就會(huì)出現(xiàn)庫(kù)存重復(fù)增加的情況
  • 退款業(yè)務(wù)。重復(fù)退款對(duì)商家而言會(huì)有經(jīng)濟(jì)損失。

所以,我們要盡可能避免業(yè)務(wù)被重復(fù)執(zhí)行。
然而在實(shí)際業(yè)務(wù)場(chǎng)景中,由于意外經(jīng)常會(huì)出現(xiàn)業(yè)務(wù)被重復(fù)執(zhí)行的情況,例如:

  • 頁(yè)面卡頓時(shí)頻繁刷新導(dǎo)致表單重復(fù)提交
  • 服務(wù)間調(diào)用的重試
  • MQ消息的重復(fù)投遞

我們?cè)谟脩糁Ц冻晒髸?huì)發(fā)送MQ消息到交易服務(wù),修改訂單狀態(tài)為已支付,就可能出現(xiàn)消息重復(fù)投遞的情況。如果消費(fèi)者不做判斷,很有可能導(dǎo)致消息被消費(fèi)多次,出現(xiàn)業(yè)務(wù)故障。
舉例:

  • 假如用戶剛剛支付完成,并且投遞消息到交易服務(wù),交易服務(wù)更改訂單為已支付狀態(tài)。
  • 由于某種原因,例如網(wǎng)絡(luò)故障導(dǎo)致生產(chǎn)者沒(méi)有得到確認(rèn),隔了一段時(shí)間后重新投遞給交易服務(wù)。
  • 但是,在新投遞的消息被消費(fèi)之前,用戶選擇了退款,將訂單狀態(tài)改為了已退款狀態(tài)。
  • 退款完成后,新投遞的消息才被消費(fèi),那么訂單狀態(tài)會(huì)被再次改為已支付。業(yè)務(wù)異常。

因此,我們必須想辦法保證消息處理的冪等性。這里給出兩種方案:

  • 唯一消息ID
  • 業(yè)務(wù)狀態(tài)判斷

唯一消息ID

這個(gè)思路非常簡(jiǎn)單:

  • 每一條消息都生成一個(gè)唯一的id,與消息一起投遞給消費(fèi)者。
  • 消費(fèi)者接收到消息后處理自己的業(yè)務(wù),業(yè)務(wù)處理成功后將消息ID保存到數(shù)據(jù)庫(kù)
  • 如果下次又收到相同消息,去數(shù)據(jù)庫(kù)查詢判斷是否存在,存在則為重復(fù)消息放棄處理。

我們?cè)撊绾谓o消息添加唯一ID呢?
其實(shí)很簡(jiǎn)單,SpringAMQP的MessageConverter自帶了MessageID的功能,我們只要開啟這個(gè)功能即可。
以Jackson的消息轉(zhuǎn)換器為例:

@Bean
public MessageConverter messageConverter(){
    // 1.定義消息轉(zhuǎn)換器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自動(dòng)創(chuàng)建消息id,用于識(shí)別不同消息,也可以在業(yè)務(wù)中基于ID判斷是否是重復(fù)消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

業(yè)務(wù)判斷

業(yè)務(wù)判斷就是基于業(yè)務(wù)本身的邏輯或狀態(tài)來(lái)判斷是否是重復(fù)的請(qǐng)求或消息,不同的業(yè)務(wù)場(chǎng)景判斷的思路也不一樣。
處理消息的業(yè)務(wù)邏輯是把訂單狀態(tài)從未支付修改為已支付。因此我們就可以在執(zhí)行業(yè)務(wù)時(shí)判斷訂單狀態(tài)是否是未支付,如果不是則證明訂單已經(jīng)被處理過(guò),無(wú)需重復(fù)處理。

相比較而言,消息ID的方案需要改造原有的數(shù)據(jù)庫(kù),所以我更推薦使用業(yè)務(wù)判斷的方案。

以支付修改訂單的業(yè)務(wù)為例,我們需要修改OrderServiceImpl中的markOrderPaySuccess方法:

    @Override
    public void markOrderPaySuccess(Long orderId) {
        // 1.查詢訂單
        Order old = getById(orderId);
        // 2.判斷訂單狀態(tài)
        if (old == null || old.getStatus() != 1) {
            // 訂單不存在或者訂單狀態(tài)不是1,放棄處理
            return;
        }
        // 3.嘗試更新訂單
        Order order = new Order();
        order.setId(orderId);
        order.setStatus(2);
        order.setPayTime(LocalDateTime.now());
        updateById(order);
    }

上述代碼邏輯上符合了冪等判斷的需求,但是由于判斷和更新是兩步動(dòng)作,因此在極小概率下可能存在線程安全問(wèn)題。

我們可以合并上述操作為這樣:

@Override
public void markOrderPaySuccess(Long orderId) {
    // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
    lambdaUpdate()
            .set(Order::getStatus, 2)
            .set(Order::getPayTime, LocalDateTime.now())
            .eq(Order::getId, orderId)
            .eq(Order::getStatus, 1)
            .update();
}

注意看,上述代碼等同于這樣的SQL語(yǔ)句:

UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我們?cè)趙here條件中除了判斷id以外,還加上了status必須為1的條件。如果條件不符(說(shuō)明訂單已支付),則SQL匹配不到數(shù)據(jù),根本不會(huì)執(zhí)行。

兜底方案

其實(shí)思想很簡(jiǎn)單:既然MQ通知不一定發(fā)送到交易服務(wù),那么交易服務(wù)就必須自己主動(dòng)去查詢支付狀態(tài)。這樣即便支付服務(wù)的MQ通知失敗,我們依然能通過(guò)主動(dòng)查詢來(lái)保證訂單狀態(tài)的一致。
流程如下:

圖中黃色線圈起來(lái)的部分就是MQ通知失敗后的兜底處理方案,由交易服務(wù)自己主動(dòng)去查詢支付狀態(tài)。

不過(guò)需要注意的是,交易服務(wù)并不知道用戶會(huì)在什么時(shí)候支付,如果查詢的時(shí)機(jī)不正確(比如查詢的時(shí)候用戶正在支付中),可能查詢到的支付狀態(tài)也不正確。
那么問(wèn)題來(lái)了,我們到底該在什么時(shí)間主動(dòng)查詢支付狀態(tài)呢?

這個(gè)時(shí)間是無(wú)法確定的,因此,通常我們采取的措施就是利用定時(shí)任務(wù)定期查詢.

  • 首先,支付服務(wù)會(huì)正在用戶支付成功以后利用MQ消息通知交易服務(wù),完成訂單狀態(tài)同步。
  • 其次,為了保證MQ消息的可靠性,我們采用了生產(chǎn)者確認(rèn)機(jī)制、消費(fèi)者確認(rèn)、消費(fèi)者失敗重試等策略,確保消息投遞的可靠性
  • 最后,我們還在交易服務(wù)設(shè)置了定時(shí)任務(wù),定期查詢訂單支付狀態(tài)。這樣即便MQ通知失敗,還可以利用定時(shí)任務(wù)作為兜底方案,確保訂單支付狀態(tài)的最終一致性。

延遲消息

在電商的支付業(yè)務(wù)中,對(duì)于一些庫(kù)存有限的商品,為了更好的用戶體驗(yàn),通常都會(huì)在用戶下單時(shí)立刻扣減商品庫(kù)存。例如電影院購(gòu)票、高鐵購(gòu)票,下單后就會(huì)鎖定座位資源,其他人無(wú)法重復(fù)購(gòu)買。

但是這樣就存在一個(gè)問(wèn)題,假如用戶下單后一直不付款,就會(huì)一直占有庫(kù)存資源,導(dǎo)致其他客戶無(wú)法正常交易,最終導(dǎo)致商戶利益受損!

因此,電商中通常的做法就是:對(duì)于超過(guò)一定時(shí)間未支付的訂單,應(yīng)該立刻取消訂單并釋放占用的庫(kù)存。

例如,訂單支付超時(shí)時(shí)間為30分鐘,則我們應(yīng)該在用戶下單后的第30分鐘檢查訂單支付狀態(tài),如果發(fā)現(xiàn)未支付,應(yīng)該立刻取消訂單,釋放庫(kù)存。

但問(wèn)題來(lái)了:如何才能準(zhǔn)確的實(shí)現(xiàn)在下單后第30分鐘去檢查支付狀態(tài)呢?

像這種在一段時(shí)間以后才執(zhí)行的任務(wù),我們稱之為延遲任務(wù),而要實(shí)現(xiàn)延遲任務(wù),最簡(jiǎn)單的方案就是利用MQ的延遲消息了。

在RabbitMQ中實(shí)現(xiàn)延遲消息也有兩種方案:

  • 死信交換機(jī)+TTL
  • 延遲消息插件

這一章我們就一起研究下這兩種方案的實(shí)現(xiàn)方式,以及優(yōu)缺點(diǎn)。

死信交換機(jī)和延遲消息

死信交換機(jī)

當(dāng)一個(gè)隊(duì)列中的消息滿足下列情況之一時(shí),可以成為死信(dead letter):

  • 消費(fèi)者使用basic.reject或 basic.nack聲明消費(fèi)失敗,并且消息的requeue參數(shù)設(shè)置為false
  • 消息是一個(gè)過(guò)期消息,超時(shí)無(wú)人消費(fèi)
  • 要投遞的隊(duì)列消息滿了,無(wú)法投遞

如果一個(gè)隊(duì)列中的消息已經(jīng)成為死信,并且這個(gè)隊(duì)列通過(guò)**dead-letter-exchange**屬性指定了一個(gè)交換機(jī),那么隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,而這個(gè)交換機(jī)就稱為死信交換機(jī)(Dead Letter Exchange)。而此時(shí)加入有隊(duì)列與死信交換機(jī)綁定,則最終死信就會(huì)被投遞到這個(gè)隊(duì)列中。

死信交換機(jī)有什么作用呢?

  • 收集那些因處理失敗而被拒絕的消息
  • 收集那些因隊(duì)列滿了而被拒絕的消息
  • 收集因TTL(有效期)到期的消息

延遲消息

前面兩種作用場(chǎng)景可以看做是把死信交換機(jī)當(dāng)做一種消息處理的最終兜底方案,與消費(fèi)者重試時(shí)講的RepublishMessageRecoverer作用類似。

而最后一種場(chǎng)景,大家設(shè)想一下這樣的場(chǎng)景:
如圖,有一組綁定的交換機(jī)(ttl.fanout)和隊(duì)列(ttl.queue)。但是ttl.queue沒(méi)有消費(fèi)者監(jiān)聽,而是設(shè)定了死信交換機(jī)hmall.direct,而隊(duì)列direct.queue1則與死信交換機(jī)綁定,RoutingKey是blue:

假如我們現(xiàn)在發(fā)送一條消息到ttl.fanout,RoutingKey為blue,并設(shè)置消息的有效期為5000毫秒:

image.png

注意:盡管這里的ttl.fanout不需要RoutingKey,但是當(dāng)消息變?yōu)樗佬挪⑼哆f到死信交換機(jī)時(shí),會(huì)沿用之前的RoutingKey,這樣hmall.direct才能正確路由消息。

消息肯定會(huì)被投遞到ttl.queue之后,由于沒(méi)有消費(fèi)者,因此消息無(wú)人消費(fèi)。5秒之后,消息的有效期到期,成為死信:

死信被再次投遞到死信交換機(jī)hmall.direct,并沿用之前的RoutingKey,也就是blue

由于direct.queue1hmall.direct綁定的key是blue,因此最終消息被成功路由到direct.queue1,如果此時(shí)有消費(fèi)者與direct.queue1綁定, 也就能成功消費(fèi)消息了。但此時(shí)已經(jīng)是5秒鐘以后了:

也就是說(shuō),publisher發(fā)送了一條消息,但最終consumer在5秒后才收到消息。我們成功實(shí)現(xiàn)了延遲消息。

總結(jié)

注意:
RabbitMQ的消息過(guò)期是基于追溯方式來(lái)實(shí)現(xiàn)的,也就是說(shuō)當(dāng)一個(gè)消息的TTL到期以后不一定會(huì)被移除或投遞到死信交換機(jī),而是在消息恰好處于隊(duì)首時(shí)才會(huì)被處理。
當(dāng)隊(duì)列中消息堆積很多的時(shí)候,過(guò)期消息可能不會(huì)被按時(shí)處理,因此你設(shè)置的TTL時(shí)間不一定準(zhǔn)確。
:::

DelayExchange插件

基于死信隊(duì)列雖然可以實(shí)現(xiàn)延遲消息,但是太麻煩了。因此RabbitMQ社區(qū)提供了一個(gè)延遲消息插件來(lái)實(shí)現(xiàn)相同的效果。
官方文檔說(shuō)明:
Scheduling Messages with RabbitMQ | RabbitMQ - Blog

下載

插件下載地址:
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

安裝

因?yàn)槲覀兪腔贒ocker安裝,所以需要先查看RabbitMQ的插件目錄對(duì)應(yīng)的數(shù)據(jù)卷。

docker volume inspect mq-plugins

結(jié)果如下:

[
    {
        "CreatedAt": "2024-06-19T09:22:59+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]

插件目錄被掛載到了/var/lib/docker/volumes/mq-plugins/_data這個(gè)目錄,我們上傳插件到該目錄下。

接下來(lái)執(zhí)行命令,安裝插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

運(yùn)行結(jié)果如下:

聲明延遲交換機(jī)

基于注解方式:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延遲消息:{}", msg);
}

基于@Bean的方式:

package com.itheima.consumer.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class DelayExchangeConfig {
    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交換機(jī)類型和名稱
                .delayed() // 設(shè)置delay的屬性為true
                .durable(true) // 持久化
                .build();
    }
    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

發(fā)送延遲消息

發(fā)送消息時(shí),必須通過(guò)x-delay屬性設(shè)定延遲時(shí)間:

@Test
void testPublisherDelayMessage() {
    // 1.創(chuàng)建消息
    String message = "hello, delayed message";
    // 2.發(fā)送消息,利用消息后置處理器添加消息頭
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延遲消息屬性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

:::warning
注意:
延遲消息插件內(nèi)部會(huì)維護(hù)一個(gè)本地?cái)?shù)據(jù)庫(kù)表,同時(shí)使用Elang Timers功能實(shí)現(xiàn)計(jì)時(shí)。如果消息的延遲時(shí)間設(shè)置較長(zhǎng),可能會(huì)導(dǎo)致堆積的延遲消息非常多,會(huì)帶來(lái)較大的CPU開銷,同時(shí)延遲消息的時(shí)間會(huì)存在誤差。
因此,不建議設(shè)置延遲時(shí)間過(guò)長(zhǎng)的延遲消息

到此這篇關(guān)于spring boot Rabbit高級(jí)教程的文章就介紹到這了,更多相關(guān)spring boot Rabbit內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot接收接口入?yún)⒌姆绞叫〗Y(jié)

    SpringBoot接收接口入?yún)⒌姆绞叫〗Y(jié)

    這篇文章主要給大家介紹了SpringBoot接收接口入?yún)⒌膸追N方式,我們從調(diào)用方的視角去看待這個(gè)問(wèn)題,對(duì)調(diào)用方來(lái)說(shuō),它在調(diào)用接口時(shí)有好幾種傳參方式,下面,將會(huì)依次對(duì)這幾種參數(shù)方式進(jìn)行講解和代碼示例,需要的朋友可以參考下
    2024-01-01
  • SpringBoot的HTTPS配置實(shí)現(xiàn)

    SpringBoot的HTTPS配置實(shí)現(xiàn)

    本文主要介紹了SpringBoot的HTTPS配置實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • Java并發(fā)編程之常用的多線程實(shí)現(xiàn)方式分析

    Java并發(fā)編程之常用的多線程實(shí)現(xiàn)方式分析

    這篇文章主要介紹了Java并發(fā)編程之常用的多線程實(shí)現(xiàn)方式,結(jié)合實(shí)例形式分析了java并發(fā)編程中多線程的相關(guān)原理、實(shí)現(xiàn)方法與操作注意事項(xiàng),需要的朋友可以參考下
    2020-02-02
  • bootstrap.yml如何讀取nacos配置中心的配置文件

    bootstrap.yml如何讀取nacos配置中心的配置文件

    這篇文章主要介紹了bootstrap.yml讀取nacos配置中心的配置文件問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-12-12
  • JAVA日常開發(fā)中一些常見(jiàn)問(wèn)題歸納講解

    JAVA日常開發(fā)中一些常見(jiàn)問(wèn)題歸納講解

    這篇文章主要給大家介紹了JAVA日常開發(fā)中一些常見(jiàn)問(wèn)題的相關(guān)資料,包括語(yǔ)法錯(cuò)誤、數(shù)據(jù)類型問(wèn)題、面向?qū)ο缶幊虇?wèn)題、集合類問(wèn)題以及文件操作問(wèn)題,通過(guò)詳細(xì)的分析和示例,幫助程序員提高代碼的健壯性和可維護(hù)性,需要的朋友可以參考下
    2024-12-12
  • 關(guān)于HashMap的put方法執(zhí)行全過(guò)程

    關(guān)于HashMap的put方法執(zhí)行全過(guò)程

    這篇文章主要介紹了關(guān)于HashMap的put方法執(zhí)行全過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-06-06
  • java實(shí)現(xiàn)小i機(jī)器人api接口調(diào)用示例

    java實(shí)現(xiàn)小i機(jī)器人api接口調(diào)用示例

    這篇文章主要介紹了java實(shí)現(xiàn)小i機(jī)器人api接口調(diào)用示例,需要的朋友可以參考下
    2014-04-04
  • java調(diào)用webservice的.asmx接口的使用步驟

    java調(diào)用webservice的.asmx接口的使用步驟

    這篇文章主要介紹了java調(diào)用webservice的.asmx接口的使用步驟,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-09-09
  • 一文揭曉如何在Java中終止一個(gè)線程

    一文揭曉如何在Java中終止一個(gè)線程

    工作中我們經(jīng)常會(huì)用到線程,一般情況下我們讓線程執(zhí)行就完事了,那么你們有沒(méi)有想過(guò)如何去終止一個(gè)正在運(yùn)行的線程呢?本文就來(lái)帶大家一起看看
    2023-03-03
  • IDEA EasyCode 一鍵幫你生成所需代碼

    IDEA EasyCode 一鍵幫你生成所需代碼

    這篇文章主要介紹了IDEA EasyCode 一鍵幫你生成所需代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08

最新評(píng)論