亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

RabbitMQ 如何解決消息冪等性的問題

 更新時(shí)間:2021年07月05日 10:00:00   作者:王小白_Ada  
這篇文章主要介紹了RabbitMQ 如何解決消息冪等性的問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

前言

關(guān)于MQ消費(fèi)者的冪等性問題,在于MQ的重試機(jī)制,因?yàn)榫W(wǎng)絡(luò)原因或客戶端延遲消費(fèi)導(dǎo)致重復(fù)消費(fèi)。使用MQ重試機(jī)制需要注意的事項(xiàng)以及如何解決消費(fèi)者冪等性問題以下將逐一講解。

1. RabbitMQ自動重試機(jī)制

消費(fèi)者在消費(fèi)消息的時(shí)候,如果消費(fèi)者業(yè)務(wù)邏輯出現(xiàn)程序異常,這個(gè)時(shí)候我們?nèi)绾翁幚恚?/p>

使用重試機(jī)制,RabbitMQ默認(rèn)開啟重試機(jī)制。

實(shí)現(xiàn)原理:

  • @RabbitHandler注解 底層使用Aop攔截,如果程序(消費(fèi)者)沒有拋出異常,自動提交事務(wù)
  • 如果Aop使用異常通知攔截獲取到異常后,自動實(shí)現(xiàn)補(bǔ)償機(jī)制,消息緩存在RabbitMQ服務(wù)器端

注意:

  • 默認(rèn)會一直重試到消費(fèi)者不拋異常為止,這樣顯然不好。我們需要修改重試機(jī)制策略,如間隔3s重試一次)

配置:

spring:
  rabbitmq:
    # 連接地址
    host: 127.0.0.1
    # 端口號
    port: 5672
    # 賬號
    username: guest
    # 密碼
    password: guest
    # 地址(類似于數(shù)據(jù)庫的概念)
    virtual-host: /admin_vhost
    # 消費(fèi)者監(jiān)聽相關(guān)配置
    listener:
      simple:
        retry:
          # 開啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制,默認(rèn)開啟并一直重試
          enabled: true
          # 最大重試次數(shù)
          max-attempts: 5
          # 重試間隔時(shí)間(毫秒)
          initial-interval: 3000

2. 如何合理選擇重試機(jī)制?

情況1: 消費(fèi)者獲取到消息后,調(diào)用第三方接口,但接口暫時(shí)無法訪問,是否需要重試? 需要重試,可能是因?yàn)榫W(wǎng)絡(luò)原因短暫不能訪問

情況2: 消費(fèi)者獲取到消息后,拋出數(shù)據(jù)轉(zhuǎn)換異常,是否需要重試? 不需要重試,因?yàn)閷儆诔绦騜ug需要重新發(fā)布版本

總結(jié):對于情況2,如果消費(fèi)者代碼拋出異常是需要發(fā)布新版本才能解決的問題,那么不需要重試,重試也無濟(jì)于事。應(yīng)該采用日志記錄+定時(shí)任務(wù)job進(jìn)行健康檢查+人工進(jìn)行補(bǔ)償

3. 調(diào)用第三方接口自動實(shí)現(xiàn)補(bǔ)償機(jī)制

我們知道了,RabbitMQ在消費(fèi)者消費(fèi)發(fā)生異常時(shí),會自動進(jìn)行補(bǔ)償機(jī)制,所以我們(消費(fèi)者)在調(diào)用第三方接口時(shí),可以根據(jù)返回結(jié)果判斷是否成功:

  • 成功:正常消費(fèi)
  • 失?。菏謩訏佁幰粋€(gè)異常,這時(shí)RabbitMQ自動給我們做重試 (補(bǔ)償)。

4. 如何解決消費(fèi)者冪等性問題

防止重復(fù)消費(fèi) (MQ重試機(jī)制需要注意的問題)

產(chǎn)生原因:網(wǎng)絡(luò)延遲傳輸中,消費(fèi)者出現(xiàn)異常或者消費(fèi)者延遲消費(fèi),會造成進(jìn)行MQ重試補(bǔ)償,在重試過程中,可能會造成重復(fù)消費(fèi)。

面試題:MQ中消費(fèi)者如何保證冪等性問題,不被重復(fù)消費(fèi)?

在這里插入圖片描述

偽代碼:

生產(chǎn)者核心代碼:

請求頭設(shè)置消息id(messageId)

@Component
public class FanoutProducer {
 @Autowired
 private AmqpTemplate amqpTemplate;

 public void send(String queueName) {
  String msg = "my_fanout_msg:" + System.currentTimeMillis();
  //請求頭設(shè)置消息id(messageId)
  Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
    .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();
  System.out.println(msg + ":" + msg);
  amqpTemplate.convertAndSend(queueName, message);
 }
}

消費(fèi)者核心代碼:

@RabbitListener(queues = "fanout_email_queue")
 public void process(Message message) throws Exception {
  // 獲取消息Id
  String messageId = message.getMessageProperties().getMessageId();
  String msg = new String(message.getBody(), "UTF-8");
  //② 判斷唯一Id是否被消費(fèi),消息消費(fèi)成功后將id和狀態(tài)保存在日志表中,我們從(①步驟)表中獲取并判斷messageId的狀態(tài)即可
  //從redis中獲取messageId的value
  String value = redisUtils.get(messageId)+"";
  if(value.equals("1") ){ //表示已經(jīng)消費(fèi)
   return; //結(jié)束
  }
  System.out.println("郵件消費(fèi)者獲取生產(chǎn)者消息" + "messageId:" + messageId + ",消息內(nèi)容:" + msg);
  JSONObject jsonObject = JSONObject.parseObject(msg);
  // 獲取email參數(shù)
  String email = jsonObject.getString("email");
  // 請求地址
  String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;
  JSONObject result = HttpClientUtils.httpGet(emailUrl);
  if (result == null) {
   // 因?yàn)榫W(wǎng)絡(luò)原因,造成無法訪問,繼續(xù)重試
   throw new Exception("調(diào)用接口失敗!");
  }
  System.out.println("執(zhí)行結(jié)束....");
  //① 執(zhí)行到這里已經(jīng)消費(fèi)成功,我們可以修改messageId的狀態(tài),并存入日志表(可以存到redis中,key為消息Id、value為狀態(tài))
 }

5. SpringBoot整合RabbitMQ應(yīng)答模式(ACK)

1.修改配置simple下添加 acknowledge-mode: manual:

spring:
  rabbitmq:
    # 連接地址
    host: 127.0.0.1
    # 端口號
    port: 5672
    # 賬號
    username: guest
    # 密碼
    password: guest
    # 地址(類似于數(shù)據(jù)庫的概念)
    virtual-host: /admin_vhost
    # 消費(fèi)者監(jiān)聽相關(guān)配置
    listener:
      simple:
        retry:
          # 開啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制,默認(rèn)開啟并一直重試
          enabled: true
          # 最大重試次數(shù)
          max-attempts: 5
          # 重試間隔時(shí)間(毫秒)
          initial-interval: 3000
        # 開啟手動ack
        acknowledge-mode: manual

2.消費(fèi)者增加代碼:

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手動ack
channel.basicAck(deliveryTag, false);手動簽收
//郵件隊(duì)列
@Component
public class FanoutEamilConsumer {
 @RabbitListener(queues = "fanout_email_queue")
 public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
  System.out
    .println(Thread.currentThread().getName() + ",郵件消費(fèi)者獲取生產(chǎn)者消息msg:" + new String(message.getBody(), "UTF-8")
      + ",messageId:" + message.getMessageProperties().getMessageId());
  // 手動ack
  Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
  // 手動簽收
  channel.basicAck(deliveryTag, false);
 }
}

RabbitMQ 如何保證冪等性,數(shù)據(jù)一致性

mq的作用主要是用來解耦,削峰,異步,

增加MQ,系統(tǒng)的復(fù)雜性也會增加很多,

也會帶來其他的問題,比如MQ掛了怎么辦,怎么保持?jǐn)?shù)據(jù)的冪等性

冪等性問題通俗點(diǎn)講就是保證數(shù)據(jù)不被重復(fù)消費(fèi),同時(shí)數(shù)據(jù)也不能少,

也就是數(shù)據(jù)一致性問題。

下面是MQ丟失的3種情況

rabbitmq-message-lose

1,生產(chǎn)者發(fā)送消息至MQ的數(shù)據(jù)丟失

解決方法:在生產(chǎn)者端開啟comfirm 確認(rèn)模式,你每次寫的消息都會分配一個(gè)唯一的 id,

然后如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個(gè) ack 消息,告訴你說這個(gè)消息 ok 了

2,MQ收到消息,暫存內(nèi)存中,還沒消費(fèi),自己掛掉,數(shù)據(jù)會都丟失

解決方式:MQ設(shè)置為持久化。將內(nèi)存數(shù)據(jù)持久化到磁盤中

3,消費(fèi)者剛拿到消息,還沒處理,掛掉了,MQ又以為消費(fèi)者處理完

解決方式:用 RabbitMQ 提供的 ack 機(jī)制,簡單來說,就是你必須關(guān)閉 RabbitMQ 的自動 ack,可以通過一個(gè) api 來調(diào)用就行,然后每次你自己代碼里確保處理完的時(shí)候,再在程序里 ack 一把。這樣的話,如果你還沒處理完,不就沒有 ack 了?那 RabbitMQ 就認(rèn)為你還沒處理完,這個(gè)時(shí)候 RabbitMQ 會把這個(gè)消費(fèi)分配給別的 consumer 去處理,消息是不會丟的。

rabbitmq-message-lose-solution

數(shù)據(jù)重復(fù)的問題簡單的多,就是在消費(fèi)端判斷數(shù)據(jù)是否已經(jīng)被消費(fèi)過

  • 比如你拿個(gè)數(shù)據(jù)要寫庫,你先根據(jù)主鍵查一下,如果這數(shù)據(jù)都有了,你就別插入了,update 一下好吧。
  • 比如你是寫 Redis,那沒問題了,反正每次都是 set,天然冪等性。
  • 比如你不是上面兩個(gè)場景,那做的稍微復(fù)雜一點(diǎn),你需要讓生產(chǎn)者發(fā)送每條數(shù)據(jù)的時(shí)候,里面加一個(gè)全局唯一的 id,類似訂單 id 之類的東西,然后你這里消費(fèi)到了之后,先根據(jù)這個(gè) id 去比如 Redis 里查一下,之前消費(fèi)過嗎?如果沒有消費(fèi)過,你就處理,然后這個(gè) id 寫 Redis。如果消費(fèi)過了,那你就別處理了,保證別重復(fù)處理相同的消息即可。
  • 比如基于數(shù)據(jù)庫的唯一鍵來保證重復(fù)數(shù)據(jù)不會重復(fù)插入多條。因?yàn)橛形ㄒ绘I約束了,重復(fù)數(shù)據(jù)插入只會報(bào)錯(cuò),不會導(dǎo)致數(shù)據(jù)庫中出現(xiàn)臟數(shù)據(jù)。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Spring Cloud Zuul路由網(wǎng)關(guān)服務(wù)過濾實(shí)現(xiàn)代碼

    Spring Cloud Zuul路由網(wǎng)關(guān)服務(wù)過濾實(shí)現(xiàn)代碼

    這篇文章主要介紹了Spring Cloud Zuul路由網(wǎng)關(guān)服務(wù)過濾實(shí)現(xiàn)代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-04-04
  • Java中使用同步回調(diào)和異步回調(diào)的示例詳解

    Java中使用同步回調(diào)和異步回調(diào)的示例詳解

    這篇文章主要介紹了Java中使用同步回調(diào)和異步回調(diào)的相關(guān)資料,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-04-04
  • Java如何將Excel數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫

    Java如何將Excel數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫

    這篇文章主要為大家詳細(xì)介紹了Java將Excel數(shù)據(jù)導(dǎo)入到數(shù)據(jù)庫的方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • SpringBoot中分頁插件PageHelper的使用詳解

    SpringBoot中分頁插件PageHelper的使用詳解

    分頁查詢是為了高效展示大量數(shù)據(jù),通過分頁將數(shù)據(jù)劃分為多個(gè)部分逐頁展示,原生方法需手動計(jì)算數(shù)據(jù)起始行,而使用PageHelper插件則簡化這一過程,本文給大家介紹SpringBoot中分頁插件PageHelper的使用,感興趣的朋友一起看看吧
    2024-09-09
  • java實(shí)現(xiàn)利用String類的簡單方法讀取xml文件中某個(gè)標(biāo)簽中的內(nèi)容

    java實(shí)現(xiàn)利用String類的簡單方法讀取xml文件中某個(gè)標(biāo)簽中的內(nèi)容

    下面小編就為大家?guī)硪黄猨ava實(shí)現(xiàn)利用String類的簡單方法讀取xml文件中某個(gè)標(biāo)簽中的內(nèi)容。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2016-12-12
  • SpringBoot自定義Banner使用詳解

    SpringBoot自定義Banner使用詳解

    這篇文章主要介紹了SpringBoot自定義Banner使用詳解,啟動 Spring Boot 時(shí),幾乎總是能在控制臺上方看到如下橫幅,這個(gè)也叫字符畫、英文ASCII藝術(shù)字,這就是banner,我們來看一下如何使用,需要的朋友可以參考下
    2024-01-01
  • hibernate一對多關(guān)聯(lián)映射學(xué)習(xí)小結(jié)

    hibernate一對多關(guān)聯(lián)映射學(xué)習(xí)小結(jié)

    這篇文章主要介紹了hibernate一對多關(guān)聯(lián)映射學(xué)習(xí)小結(jié),需要的朋友可以參考下
    2017-09-09
  • java實(shí)現(xiàn)支付寶支付接口的調(diào)用

    java實(shí)現(xiàn)支付寶支付接口的調(diào)用

    本文主要介紹了java實(shí)現(xiàn)支付寶支付接口的調(diào)用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-07-07
  • Java創(chuàng)建型設(shè)計(jì)模式之工廠方法模式深入詳解

    Java創(chuàng)建型設(shè)計(jì)模式之工廠方法模式深入詳解

    工廠方法模式(FACTORY METHOD)是一種常用的類創(chuàng)建型設(shè)計(jì)模式,此模式的核心精神是封裝類中變化的部分,提取其中個(gè)性化善變的部分為獨(dú)立類,通過依賴注入以達(dá)到解耦、復(fù)用和方便后期維護(hù)拓展的目的。它的核心結(jié)構(gòu)有四個(gè)角色,分別是抽象工廠、具體工廠、抽象產(chǎn)品、具體產(chǎn)品
    2022-09-09
  • java異常處理攔截器詳情

    java異常處理攔截器詳情

    這篇文章主要介紹了java異常處理攔截器,使用異常處理攔截器,可以不用寫那么多try…catch…,下面就來學(xué)習(xí)關(guān)于java異常處理攔截器的詳情內(nèi)容吧,需要的朋友可以參考一下
    2021-10-10

最新評論