亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Pulsar源碼徹底解決重復(fù)消費(fèi)問題

 更新時間:2023年05月29日 11:29:53   作者:crossoverJie  
這篇文章主要為大家介紹了Pulsar源碼徹底解決重復(fù)消費(fèi)問題,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

背景

最近真是和 Pulsar 杠上了,業(yè)務(wù)團(tuán)隊反饋說是線上有個應(yīng)用消息重復(fù)消費(fèi)。

而且在測試環(huán)境是可以穩(wěn)定復(fù)現(xiàn)的,根據(jù)經(jīng)驗來看一般能穩(wěn)定復(fù)現(xiàn)的都比較好解決。

定位問題

接著便是定位問題了,根據(jù)之前的經(jīng)驗讓業(yè)務(wù)按照這幾種情況先排查一下:

通過排查:1,2可以排除了。

  • 沒有相關(guān)日志
  • 存在異常,但最外層也捕獲了,所以不管有無異常都會 ACK。

第三個也在消費(fèi)的入口和提交消息出計算了時間,最終發(fā)現(xiàn)都是在2s左右 ACK 的。

偽代碼如下:

Consumer consumer = client.newConsumer()
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true)
                .topic(topic)
                .ackTimeout(30, TimeUnit.SECONDS)
                .subscriptionName("my-sub")
                .messageListener(new MessageListener<byte[]>() {
                    @SneakyThrows
                    @Override
                    public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                        log.info("msg_id{}",msg.getMessageId().toString());
                        TimeUnit.SECONDS.sleep(2);
                        consumer.acknowledge(msg);
                    }
                })
                .subscribe();

那這就很奇怪了,因為代碼里配置的 ackTimeout 是 30s,理論上來說是不會存在超時導(dǎo)致消息重發(fā)的。

為了排除是否是超時引起的,直接將業(yè)務(wù)代碼注釋掉了,等于是消息收到后立即就 ACK,經(jīng)過測試發(fā)現(xiàn)這樣確實就沒有重復(fù)消費(fèi)了。

為了再次確認(rèn)是不是和 ackTimeout 有關(guān),直接將 .ackTimeout(30, TimeUnit.SECONDS) 注釋掉后測試,發(fā)現(xiàn)也沒有重復(fù)消費(fèi)了。

確認(rèn)原因

既然如此那一定是和這個配置有關(guān)了,但看代碼確實沒有超時,為了定位具體原因只有去看 client 的源碼了。

這里簡單梳理下消息的消費(fèi)的流程:

  • 根據(jù) .receiverQueueSize(1000) 的配置,默認(rèn)情況下 broker 會直接給客戶端推送 1000 條消息。
  • 客戶端將這 1000 條消息保存到內(nèi)部隊列中。
  • 如果使用同步消費(fèi) receive() 時,本質(zhì)上就是去 take 這個內(nèi)部隊列。
  • 如果是使用的是 messageListener 異步消費(fèi)并配置 ackTimeout,每當(dāng)從隊列里獲得一條消息后便會把這條消息加入 UnAckedMessageTracker 內(nèi)部的一個時間輪中,定時檢測頂部是否存在消息,如果存在則會觸發(fā)重新投遞。
    4.1 加入時間輪后,異步調(diào)用我們自定義的事件,這個異步操作是提交到一個無界隊列中由單個線程依次排隊執(zhí)行(這點是這次問題的關(guān)鍵)
  • 業(yè)務(wù) ACK 的時候會從時間輪中刪除消息,所以如果消息 ACK 的足夠快,在第四步就不會獲取到消息進(jìn)行重新投遞。

整體流程如上圖,代碼細(xì)節(jié)如下圖:

所以問題的根本原因就是寫入時間輪(UnAckedMessageTracker)開始倒計時的線程和回調(diào)業(yè)務(wù)邏輯的不是同一個線程。

如果業(yè)務(wù)執(zhí)行耗時,等到消息從那個單線程的無界隊列中取出來的時候很有可能已經(jīng)過了 ackTimeou 的時間,從而導(dǎo)致了超時重發(fā)。

也就是用戶所理解的 ackTimeout 周期(應(yīng)該進(jìn)入回調(diào)時候開始計時)和 SDK 實現(xiàn)的不一致造成的。

之后我再次確認(rèn)同樣的代碼換為同步消費(fèi)是沒有問題的,不會導(dǎo)致重復(fù)消費(fèi):

while (true) {
Message msg = consumer.receive();
            log.info(
                    "consumer Message received: " + new String(msg.getData()) + msg.getMessageId().toString());
            TimeUnit.SECONDS.sleep(2);
            consumer.acknowledge(msg);    
}

查看代碼后發(fā)現(xiàn)同步代碼的獲取消息和加入 UnAckedMessageTracker 時間輪是同步的,也就不會出現(xiàn)超時的問題。

總結(jié)

所以其實 是messageListener 異步消費(fèi)的 ackTimeout 的語義是有問題的,需要將加入 UnAckedMessageTracker 處移動到回調(diào)函數(shù)中同步調(diào)用。

我查看了最新的 2.11.x 版本的代碼依然沒有修復(fù),正準(zhǔn)備提個 PR 切換到 master 時才發(fā)現(xiàn)已經(jīng)有相關(guān)的 PR 了,只是還沒有發(fā)版。

修復(fù)的背景和思路也是類似的,具體參考:

https://github.com/apache/pul...

其實業(yè)務(wù)中并不推薦使用 ackTimeout 這個配置了,不好預(yù)估時間從而導(dǎo)致超時,而且我相信大部分業(yè)務(wù)配置好 ackTImeout 后直到后續(xù)出問題的時候才想起來要改。

所以干脆一開始就不要使用。

在 go 版本的 SDK 中直接廢棄掉了這個參數(shù),推薦使用 nack API 替換。

以上就是Pulsar源碼徹底解決重復(fù)消費(fèi)問題的詳細(xì)內(nèi)容,更多關(guān)于Pulsar重復(fù)消費(fèi)解決的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Springboot獲取文件內(nèi)容如何將MultipartFile轉(zhuǎn)File

    Springboot獲取文件內(nèi)容如何將MultipartFile轉(zhuǎn)File

    本文給大家介紹Springboot獲取文件內(nèi)容,將MultipartFile轉(zhuǎn)File方法,本文結(jié)合示例代碼給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧
    2024-01-01
  • Java實現(xiàn)斗地主的發(fā)牌功能

    Java實現(xiàn)斗地主的發(fā)牌功能

    這篇文章主要為大家詳細(xì)介紹了Java實現(xiàn)斗地主的發(fā)牌功能,含按順序發(fā)牌和玩家牌排序顯示等功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-06-06
  • java poi導(dǎo)出excel時如何設(shè)置手動換行

    java poi導(dǎo)出excel時如何設(shè)置手動換行

    這篇文章主要介紹了java poi導(dǎo)出excel時如何設(shè)置手動換行,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06
  • idea創(chuàng)建springboot項目(版本只能選擇17和21)的解決方法

    idea創(chuàng)建springboot項目(版本只能選擇17和21)的解決方法

    idea2023創(chuàng)建spring boot項目時,java版本無法選擇11,本文主要介紹了idea創(chuàng)建springboot項目(版本只能選擇17和21),下面就來介紹一下解決方法,感興趣的可以了解一下
    2024-01-01
  • Java封裝數(shù)組之添加元素操作實例分析

    Java封裝數(shù)組之添加元素操作實例分析

    這篇文章主要介紹了Java封裝數(shù)組之添加元素操作,結(jié)合實例形式分析了Java封裝數(shù)組實現(xiàn)元素追加、插入等相關(guān)操作技巧,需要的朋友可以參考下
    2020-03-03
  • Mybatis的核心配置文件使用方法

    Mybatis的核心配置文件使用方法

    Mybatis的核心配置文件有兩個,一個是全局配置文件,它包含了會深深影響Mybatis行為的設(shè)置和屬性信息;一個是映射文件,它很簡單,讓用戶能更專注于SQL代碼,本文主要介紹了Mybatis的核心配置文件使用方法,感興趣的可以了解一下
    2023-11-11
  • Spring?Boot3虛擬線程的使用步驟詳解

    Spring?Boot3虛擬線程的使用步驟詳解

    虛擬線程是 Java 19 中引入的一個新特性,旨在通過簡化線程管理來提升應(yīng)用程序的并發(fā)性能,這篇文章主要介紹了Spring?Boot3虛擬線程的使用步驟,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2025-03-03
  • Java實現(xiàn)多個sheet頁數(shù)據(jù)導(dǎo)出功能

    Java實現(xiàn)多個sheet頁數(shù)據(jù)導(dǎo)出功能

    這篇文章主要為大家詳細(xì)介紹了Java實現(xiàn)多個sheet頁數(shù)據(jù)導(dǎo)出功能的相關(guān)知識,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2024-03-03
  • java 對象輸入輸出流讀寫文件的操作實例

    java 對象輸入輸出流讀寫文件的操作實例

    這篇文章主要介紹了java 對象輸入輸出流讀寫文件的操作實例的相關(guān)資料,這里使用實現(xiàn)Serializable接口,需要的朋友可以參考下
    2017-07-07
  • java 實現(xiàn)圖片圓角處理、背景透明化

    java 實現(xiàn)圖片圓角處理、背景透明化

    這篇文章主要介紹了java 實現(xiàn)圖片圓角處理、背景透明化,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11

最新評論