java中Pulsar?InterruptedException?異常
背景
今天收到業(yè)務(wù)團(tuán)隊(duì)反饋線上有個(gè)應(yīng)用往 Pulsar 中發(fā)送消息失敗了,經(jīng)過日志查看得知是發(fā)送消息時(shí)候拋出了 java.lang.InterruptedException
異常。
和業(yè)務(wù)溝通后得知是在一個(gè) gRPC
接口中觸發(fā)的消息發(fā)送,大約持續(xù)了半個(gè)小時(shí)的異常后便恢復(fù)正常了,這是整個(gè)問題的背景。
前置排查
拿到該問題后首先排查下是否是共性問題,查看了其他的應(yīng)用沒有發(fā)現(xiàn)類似的異常;同時(shí)也查看了 Pulsar broker 的監(jiān)控大盤,在這個(gè)時(shí)間段依然沒有波動(dòng)和異常;
這樣可以初步排除是 Pulsar 服務(wù)端的問題。
接著便是查看應(yīng)用那段時(shí)間的負(fù)載情況,從應(yīng)用 QPS 到 JVM 的各個(gè)內(nèi)存情況依然沒發(fā)現(xiàn)有什么明顯的變化。
Pulsar 源碼排查
既然看起來應(yīng)用本身和 Pulsar broker 都沒有問題的話那就只能從異常本身來排查了。
首先第一步要得知具體使用的是 Pulsar-client
是版本是多少,因?yàn)闃I(yè)務(wù)使用的是內(nèi)部基于官方 SDK 封裝 springboot starter
所以第一步還得排查這個(gè) starter
是否有影響。
通過查看源碼基本排除了 starter
的嫌疑,里面只是簡單的封裝了 SDK
的功能而已。
org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393) at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java) at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source) at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java) at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292) at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363) at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191) at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343) at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318) at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)
接下來便只能是分析堆棧了,因?yàn)?Pulsar-client 的部分實(shí)現(xiàn)源碼是沒有直接打包到依賴中的,反編譯的話許多代碼行數(shù)對(duì)不上,所以需要將官方的源碼拉到本地,切換到對(duì)于的分支進(jìn)行查看。
這一步稍微有點(diǎn)麻煩,首先是代碼庫還挺大的,加上之前如果沒有準(zhǔn)備好 Pulsar 的開發(fā)環(huán)境的話估計(jì)會(huì)勸退一部分人;
但其實(shí)大部分問題都是網(wǎng)絡(luò)造成的,只要配置一些 Maven 鏡像多試幾次總會(huì)編譯成功。
我這里直接將分支切換到 branch-2.8
。
從堆棧的頂部開始排查 TypedMessageBuilderImpl.java:91
:
看起來是內(nèi)部異步發(fā)送消息的時(shí)候拋了異常。
接著往下看到這里:
java.lang.InterruptedException at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at
看起來是這里沒錯(cuò),但是代碼行數(shù)明顯不對(duì);因?yàn)?2.8 這個(gè)分支也是修復(fù)過幾個(gè)版本,所以中間有修改導(dǎo)致代碼行數(shù)與最新代碼對(duì)不上也正常。
semaphore.get().acquire();
不過初步來看應(yīng)該是這行代碼拋出的線程終端異常,這里看起來只有他最有可能了。
為了確認(rèn)是否是真的是這行代碼,這個(gè)文件再往前翻了幾個(gè)版本最終確認(rèn)了就是這行代碼沒錯(cuò)了。
我們點(diǎn)開java.util.concurrent.Semaphore#acquire()
的源碼,
/** * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * for a permit, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted() || (tryAcquireShared(arg) < 0 && acquire(null, arg, true, true, false, 0L) < 0)) throw new InterruptedException(); }
通過源碼會(huì)發(fā)現(xiàn) acquire()
函數(shù)確實(shí)會(huì)響應(yīng)中斷,一旦檢測到當(dāng)前線程被中斷后便會(huì)拋出 InterruptedException
異常。
定位問題
所以問題的原因基本確定了,就是在 Pulsar 的發(fā)送消息線程被中斷了導(dǎo)致的,但為啥會(huì)被中斷還需要繼續(xù)排查。
我們知道線程中斷是需要調(diào)用 Thread.currentThread().interrupt();
API的,首先猜測是否 Pulsar 客戶端內(nèi)部有個(gè)線程中斷了這個(gè)發(fā)送線程。
于是我在 pulsar-client
這個(gè)模塊中搜索了相關(guān)代碼:
排除掉和 producer 不相關(guān)的地方,其余所有中斷線程的代碼都是在有了該異常之后繼續(xù)傳遞而已;所以初步來看 pulsar-client 內(nèi)部沒有主動(dòng)中斷的操作。
既然 Pulsar 自己沒有做,那就只可能是業(yè)務(wù)做的了?
于是我在業(yè)務(wù)代碼中搜索了一下:
果然在業(yè)務(wù)代碼中搜到了唯一一處中斷的地方,而且通過調(diào)用關(guān)系得知這段代碼是在消息發(fā)送前執(zhí)行的,并且和 Pulsar 發(fā)送函數(shù)處于同一線程。
大概的偽代碼如下:
List.of(1, 2, 3).stream().map(e -> { return CompletableFuture.supplyAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException ex) { throw new RuntimeException(ex); } return e; }); } ).collect(Collectors.toList()).forEach(f -> { try { Integer integer = f.get(); log.info("====" + integer); if (integer==3){ TimeUnit.SECONDS.sleep(10); Thread.currentThread().interrupt(); } } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } }); MessageId send = producer.newMessage().value(msg.getBytes()).send();
執(zhí)行這段代碼可以完全復(fù)現(xiàn)同樣的堆棧。
幸好中斷這里還打得有日志:
通過日志搜索發(fā)現(xiàn)異常的時(shí)間和這個(gè)中斷的日志時(shí)間點(diǎn)完全重合,這樣也就知道根本原因了。
因?yàn)闃I(yè)務(wù)線程和消息發(fā)送線程是同一個(gè),在某些情況下會(huì)執(zhí)行 Thread.currentThread().interrupt();
,其實(shí)單純執(zhí)行這行函數(shù)并不會(huì)發(fā)生什么,只要沒有去響應(yīng)這個(gè)中斷,也就是 Semaphore
源碼中的判斷了線程中斷的標(biāo)記:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted() || (tryAcquireShared(arg) < 0 && acquire(null, arg, true, true, false, 0L) < 0)) throw new InterruptedException(); }
但恰好這里業(yè)務(wù)中斷后自己并沒有去判斷這個(gè)標(biāo)記,導(dǎo)致 Pulsar 內(nèi)部去判斷了,最終拋出了這個(gè)異常。
總結(jié)
所以歸根結(jié)底還是這里的代碼不合理導(dǎo)致的,首先是自己中斷了線程但也沒使用,從而導(dǎo)致有被其他基礎(chǔ)庫使用的可能,所以會(huì)造成了一些不可預(yù)知的后果。
再一個(gè)是不建議在業(yè)務(wù)代碼中使用 Thread.currentThread().interrupt();
這類代碼,第一眼根本不知道是要干啥,也不易維護(hù)。
其實(shí)本質(zhì)上線程中斷也是線程間通信的一種手段,有這類需求完全可以換為內(nèi)置的 BlockQueue
這類函數(shù)來實(shí)現(xiàn)。
以上就是java中Pulsar InterruptedException 異常的詳細(xì)內(nèi)容,更多關(guān)于 Pulsar InterruptedException異常的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring中的@ControllerAdvice三種用法詳解
這篇文章主要介紹了Spring中的@ControllerAdvice三種用法詳解,加了@ControllerAdvice的類為那些聲明了(@ExceptionHandler、@InitBinder或@ModelAttribute注解修飾的)方法的類而提供的<BR>專業(yè)化的@Component,以供多個(gè)Controller類所共享,需要的朋友可以參考下2024-01-01在SpringBoot中整合數(shù)據(jù)源的示例詳解
這篇文章主要介紹了在SpringBoot中如何整合數(shù)據(jù)源,本文介紹了如何在SpringBoot項(xiàng)目中整合常見的數(shù)據(jù)源,包括JdbcTemplate、MyBatis和JPA,并探討了如何配置和使用多數(shù)據(jù)源,需要的朋友可以參考下2023-06-06java實(shí)現(xiàn)文件編碼轉(zhuǎn)換的方法
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)文件編碼轉(zhuǎn)換的方法,分享一個(gè)文件編碼轉(zhuǎn)換的工具類,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-05-05SpringBoot+Redis實(shí)現(xiàn)數(shù)據(jù)字典的方法
這篇文章主要介紹了SpringBoot+Redis實(shí)現(xiàn)數(shù)據(jù)字典的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10JAVA實(shí)現(xiàn)基于Tcp協(xié)議的簡單Socket通信實(shí)例
本篇文章主要介紹了JAVA實(shí)現(xiàn)基于Tcp協(xié)議的簡單Socket通信實(shí)例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-01-01Java Thread之Sleep()使用方法總結(jié)
這篇文章主要介紹了Java Thread之Sleep()使用方法總結(jié),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05解決SpringMVC使用@RequestBody注解報(bào)400錯(cuò)誤的問題
這篇文章主要介紹了解決SpringMVC使用@RequestBody注解報(bào)400錯(cuò)誤的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09postman中實(shí)現(xiàn)傳遞@RequestBody參數(shù)
這篇文章主要介紹了postman中實(shí)現(xiàn)傳遞@RequestBody參數(shù),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10Java?synchronized底層實(shí)現(xiàn)原理以及鎖優(yōu)化
Synchronized是Java中解決并發(fā)問題的一種最常用的方法,也是最簡單的一種方法,下面這篇文章主要給大家介紹了關(guān)于Java?synchronized底層實(shí)現(xiàn)原理以及鎖優(yōu)化的相關(guān)資料,需要的朋友可以參考下2022-02-02