Spring?Boot異步線程間數(shù)據(jù)傳遞的四種方式
Spring Boot 自定義線程池實(shí)現(xiàn)異步開發(fā)
Spring Boot 自定義線程池實(shí)現(xiàn)異步開發(fā)相信看過(guò)的都了解,但是在實(shí)際開發(fā)中需要在父子線程之間傳遞一些數(shù)據(jù),比如用戶信息,鏈路信息等等
比如用戶登錄信息使用ThreadLocal存放保證線程隔離,代碼如下:
/** * @author 公眾號(hào):碼猿技術(shù)專欄 * @description 用戶上下文信息 */ public class OauthContext { private static final ThreadLocal<LoginVal> loginValThreadLocal=new ThreadLocal<>(); public static LoginVal get(){ return loginValThreadLocal.get(); } public static void set(LoginVal loginVal){ loginValThreadLocal.set(loginVal); } public static void clear(){ loginValThreadLocal.remove(); } }
那么子線程想要獲取這個(gè)LoginVal如何做呢?
今天就來(lái)介紹幾種優(yōu)雅的方式實(shí)現(xiàn)Spring Boot 內(nèi)部的父子線程的數(shù)據(jù)傳遞。
1. 手動(dòng)設(shè)置
每執(zhí)行一次異步線程都要分為兩步:
- 獲取父線程的LoginVal
- 將LoginVal設(shè)置到子線程,達(dá)到復(fù)用
代碼如下:
public void handlerAsync() { //1. 獲取父線程的loginVal LoginVal loginVal = OauthContext.get(); log.info("父線程的值:{}",OauthContext.get()); CompletableFuture.runAsync(()->{ //2. 設(shè)置子線程的值,復(fù)用 OauthContext.set(loginVal); log.info("子線程的值:{}",OauthContext.get()); }); }
雖然能夠?qū)崿F(xiàn)目的,但是每次開異步線程都需要手動(dòng)設(shè)置,重復(fù)代碼太多,看了頭疼,你認(rèn)為優(yōu)雅嗎?
2. 線程池設(shè)置TaskDecorator
TaskDecorator是什么?官方api的大致意思:這是一個(gè)執(zhí)行回調(diào)方法的裝飾器,主要應(yīng)用于傳遞上下文,或者提供任務(wù)的監(jiān)控/統(tǒng)計(jì)信息。
知道有這么一個(gè)東西,如何去使用?
TaskDecorator是一個(gè)接口,首先需要去實(shí)現(xiàn)它,代碼如下:
/** * @description 上下文裝飾器 */ public class ContextTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { //獲取父線程的loginVal LoginVal loginVal = OauthContext.get(); return () -> { try { // 將主線程的請(qǐng)求信息,設(shè)置到子線程中 OauthContext.set(loginVal); // 執(zhí)行子線程,這一步不要忘了 runnable.run(); } finally { // 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏 OauthContext.clear(); } }; } }
這里我只是設(shè)置了LoginVal,實(shí)際開發(fā)中其他的共享數(shù)據(jù),比如SecurityContext
,RequestAttributes
....
TaskDecorator
需要結(jié)合線程池使用,實(shí)際開發(fā)中異步線程建議使用線程池,只需要在對(duì)應(yīng)的線程池配置一下,代碼如下:
@Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor(); poolTaskExecutor.setCorePoolSize(xx); poolTaskExecutor.setMaxPoolSize(xx); // 設(shè)置線程活躍時(shí)間(秒) poolTaskExecutor.setKeepAliveSeconds(xx); // 設(shè)置隊(duì)列容量 poolTaskExecutor.setQueueCapacity(xx); //設(shè)置TaskDecorator,用于解決父子線程間的數(shù)據(jù)復(fù)用 poolTaskExecutor.setTaskDecorator(new ContextTaskDecorator()); poolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池 poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); return poolTaskExecutor; }
此時(shí)業(yè)務(wù)代碼就不需要去設(shè)置子線程的值,直接使用即可,代碼如下:
public void handlerAsync() { log.info("父線程的用戶信息:{}", OauthContext.get()); //執(zhí)行異步任務(wù),需要指定的線程池 CompletableFuture.runAsync(()-> log.info("子線程的用戶信息:{}", OauthContext.get()),taskExecutor); }
來(lái)看一下結(jié)果,如下圖:
這里使用的是CompletableFuture
執(zhí)行異步任務(wù),使用@Async
這個(gè)注解同樣是可行的。
注意:無(wú)論使用何種方式,都需要指定線程池
3. InheritableThreadLocal
這種方案不建議使用,InheritableThreadLocal雖然能夠?qū)崿F(xiàn)父子線程間的復(fù)用,但是在線程池中使用會(huì)存在復(fù)用的問(wèn)題,具體的可以看陳某之前的文章:微服務(wù)中使用阿里開源的TTL,優(yōu)雅的實(shí)現(xiàn)身份信息的線程間復(fù)用
這種方案使用也是非常簡(jiǎn)單,直接用InheritableThreadLocal替換ThreadLocal即可,代碼如下:
/** * * @description 用戶上下文信息 */ public class OauthContext { private static final InheritableThreadLocal<LoginVal> loginValThreadLocal=new InheritableThreadLocal<>(); public static LoginVal get(){ return loginValThreadLocal.get(); } public static void set(LoginVal loginVal){ loginValThreadLocal.set(loginVal); } public static void clear(){ loginValThreadLocal.remove(); } }
4. TransmittableThreadLocal
TransmittableThreadLocal是阿里開源的工具,彌補(bǔ)了InheritableThreadLocal的缺陷,在使用線程池等會(huì)池化復(fù)用線程的執(zhí)行組件情況下,提供ThreadLocal
值的傳遞功能,解決異步執(zhí)行時(shí)上下文傳遞的問(wèn)題。
使用起來(lái)也是非常簡(jiǎn)單,添加依賴如下:
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.14.2</version> </dependency>
OauthContext改造代碼如下:
/** * @description 用戶上下文信息 */ public class OauthContext { private static final TransmittableThreadLocal<LoginVal> loginValThreadLocal=new TransmittableThreadLocal<>(); public static LoginVal get(){ return loginValThreadLocal.get(); } public static void set(LoginVal loginVal){ loginValThreadLocal.set(loginVal); } public static void clear(){ loginValThreadLocal.remove(); } }
關(guān)于TransmittableThreadLocal想深入了解其原理可以看陳某之前的文章:微服務(wù)中使用阿里開源的TTL,優(yōu)雅的實(shí)現(xiàn)身份信息的線程間復(fù)用,應(yīng)用還是非常廣泛的
TransmittableThreadLocal原理
從定義來(lái)看,TransimittableThreadLocal
繼承于InheritableThreadLocal
,并實(shí)現(xiàn)TtlCopier
接口,它里面只有一個(gè)copy
方法。所以主要是對(duì)InheritableThreadLocal
的擴(kuò)展。
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T>
在TransimittableThreadLocal
中添加holder
屬性。這個(gè)屬性的作用就是被標(biāo)記為具備線程傳遞資格的對(duì)象都會(huì)被添加到這個(gè)對(duì)象中。
要標(biāo)記一個(gè)類,比較容易想到的方式,就是給這個(gè)類新增一個(gè)Type
字段,還有一個(gè)方法就是將具備這種類型的的對(duì)象都添加到一個(gè)靜態(tài)全局集合中。之后使用時(shí),這個(gè)集合里的所有值都具備這個(gè)標(biāo)記。
// 1. holder本身是一個(gè)InheritableThreadLocal對(duì)象 // 2. 這個(gè)holder對(duì)象的value是WeakHashMap<TransmittableThreadLocal<Object>, ?> // 2.1 WeekHashMap的value總是null,且不可能被使用。 // 2.2 WeekHasshMap支持value=null private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() { @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); } /** * 重寫了childValue方法,實(shí)現(xiàn)上直接將父線程的屬性作為子線程的本地變量對(duì)象。 */ @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue); } };
應(yīng)用代碼是通過(guò)TtlExecutors
工具類對(duì)線程池對(duì)象進(jìn)行包裝。工具類只是簡(jiǎn)單的判斷,輸入的線程池是否已經(jīng)被包裝過(guò)、非空校驗(yàn)等,然后返回包裝類ExecutorServiceTtlWrapper
。根據(jù)不同的線程池類型,有不同和的包裝類。
@Nullable public static ExecutorService getTtlExecutorService(@Nullable ExecutorService executorService) { if (TtlAgent.isTtlAgentLoaded() || executorService == null || executorService instanceof TtlEnhanced) { return executorService; } return new ExecutorServiceTtlWrapper(executorService); }
進(jìn)入包裝類ExecutorServiceTtlWrapper
??梢宰⒁獾讲徽撌峭ㄟ^(guò)ExecutorServiceTtlWrapper#submit
方法或者是ExecutorTtlWrapper#execute
方法,都會(huì)將線程對(duì)象包裝成TtlCallable
或者TtlRunnable
,用于在真正執(zhí)行run
方法前做一些業(yè)務(wù)邏輯。
/** * 在ExecutorServiceTtlWrapper實(shí)現(xiàn)submit方法 */ @NonNull @Override public <T> Future<T> submit(@NonNull Callable<T> task) { return executorService.submit(TtlCallable.get(task)); } /** * 在ExecutorTtlWrapper實(shí)現(xiàn)execute方法 */ @Override public void execute(@NonNull Runnable command) { executor.execute(TtlRunnable.get(command)); }
所以,重點(diǎn)的核心邏輯應(yīng)該是在TtlCallable#call()
或者TtlRunnable#run()
中。以下以TtlCallable
為例,TtlRunnable
同理類似。在分析call()
方法之前,先看一個(gè)類Transmitter
public static class Transmitter { /** * 捕獲當(dāng)前線程中的是所有TransimittableThreadLocal和注冊(cè)ThreadLocal的值。 */ @NonNull public static Object capture() { return new Snapshot(captureTtlValues(), captureThreadLocalValues()); } /** * 捕獲TransimittableThreadLocal的值,將holder中的所有值都添加到HashMap后返回。 */ private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() { HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap<TransmittableThreadLocal<Object>, Object>(); for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } return ttl2Value; } /** * 捕獲注冊(cè)的ThreadLocal的值,也就是原本線程中的ThreadLocal,可以注冊(cè)到TTL中,在 * 進(jìn)行線程池本地變量傳遞時(shí)也會(huì)被傳遞。 */ private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() { final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap<ThreadLocal<Object>, Object>(); for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){ final ThreadLocal<Object> threadLocal = entry.getKey(); final TtlCopier<Object> copier = entry.getValue(); threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get())); } return threadLocal2Value; } /** * 將捕獲到的本地變量進(jìn)行替換子線程的本地變量,并且返回子線程現(xiàn)有的本地變量副本backup。 * 用于在執(zhí)行run/call方法之后,將本地變量副本恢復(fù)。 */ @NonNull public static Object replay(@NonNull Object captured) { final Snapshot capturedSnapshot = (Snapshot) captured; return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); } /** * 替換TransmittableThreadLocal */ @NonNull private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) { // 創(chuàng)建副本backup HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap<TransmittableThreadLocal<Object>, Object>(); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); // 對(duì)當(dāng)前線程的本地變量進(jìn)行副本拷貝 backup.put(threadLocal, threadLocal.get()); // 若出現(xiàn)調(diào)用線程中不存在某個(gè)線程變量,而線程池中線程有,則刪除線程池中對(duì)應(yīng)的本地變量 if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 將捕獲的TTL值打入線程池獲取到的線程TTL中。 setTtlValuesTo(captured); // 是一個(gè)擴(kuò)展點(diǎn),調(diào)用TTL的beforeExecute方法。默認(rèn)實(shí)現(xiàn)為空 doExecuteCallback(true); return backup; } private static HashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> captured) { final HashMap<ThreadLocal<Object>, Object> backup = new HashMap<ThreadLocal<Object>, Object>(); for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); backup.put(threadLocal, threadLocal.get()); final Object value = entry.getValue(); if (value == threadLocalClearMark) threadLocal.remove(); else threadLocal.set(value); } return backup; } /** * 清除單線線程的所有TTL和TL,并返回清除之氣的backup */ @NonNull public static Object clear() { final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap<TransmittableThreadLocal<Object>, Object>(); final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap<ThreadLocal<Object>, Object>(); for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){ final ThreadLocal<Object> threadLocal = entry.getKey(); threadLocal2Value.put(threadLocal, threadLocalClearMark); } return replay(new Snapshot(ttl2Value, threadLocal2Value)); } /** * 還原 */ public static void restore(@NonNull Object backup) { final Snapshot backupSnapshot = (Snapshot) backup; restoreTtlValues(backupSnapshot.ttl2Value); restoreThreadLocalValues(backupSnapshot.threadLocal2Value); } private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) { // 擴(kuò)展點(diǎn),調(diào)用TTL的afterExecute doExecuteCallback(false); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); if (!backup.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 將本地變量恢復(fù)成備份版本 setTtlValuesTo(backup); } private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) { TransmittableThreadLocal<Object> threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); } } private static void restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> backup) { for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); } } /** * 快照類,保存TTL和TL */ private static class Snapshot { final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value; final HashMap<ThreadLocal<Object>, Object> threadLocal2Value; private Snapshot(HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, HashMap<ThreadLocal<Object>, Object> threadLocal2Value) { this.ttl2Value = ttl2Value; this.threadLocal2Value = threadLocal2Value; } }
進(jìn)入TtlCallable#call()
方法。
@Override public V call() throws Exception { Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after call!"); } // 調(diào)用replay方法將捕獲到的當(dāng)前線程的本地變量,傳遞給線程池線程的本地變量, // 并且獲取到線程池線程覆蓋之前的本地變量副本。 Object backup = replay(captured); try { // 線程方法調(diào)用 return callable.call(); } finally { // 使用副本進(jìn)行恢復(fù)。 restore(backup); } }
到這基本上線程池方式傳遞本地變量的核心代碼已經(jīng)大概看完了??偟膩?lái)說(shuō)在創(chuàng)建TtlCallable
對(duì)象是,調(diào)用capture()
方法捕獲調(diào)用方的本地線程變量,在call()
執(zhí)行時(shí),將捕獲到的線程變量,替換到線程池所對(duì)應(yīng)獲取到的線程的本地變量中,并且在執(zhí)行完成之后,將其本地變量恢復(fù)到調(diào)用之前。
總結(jié)
上述列舉了4種方案,陳某這里推薦方案2和方案4,其中兩種方案的缺點(diǎn)非常明顯,實(shí)際開發(fā)中也是采用的方案2或者方案4
以上就是Spring Boot異步線程間數(shù)據(jù)傳遞的四種方式的詳細(xì)內(nèi)容,更多關(guān)于Spring Boot異步線程數(shù)據(jù)傳遞的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- 在 Spring Boot 中使用異步線程時(shí)的 HttpServletRequest 復(fù)用問(wèn)題記錄
- SpringBoot異步線程父子線程數(shù)據(jù)傳遞的5種方式
- springboot?正確的在異步線程中使用request的示例代碼
- SpringBoot?異步線程間傳遞上下文方式
- SpringBoot獲取HttpServletRequest的3種方式總結(jié)
- SpringBoot詳細(xì)講解異步任務(wù)如何獲取HttpServletRequest
- SpringBoot實(shí)現(xiàn)任意位置獲取HttpServletRequest對(duì)象
- Spring Boot 中正確地在異步線程中使用 HttpServletRequest的方法
相關(guān)文章
StringUtils中的isEmpty、isNotEmpty、isBlank和isNotBlank的區(qū)別詳解
這篇文章主要介紹了StringUtils中的isEmpty、isNotEmpty、isBlank和isNotBlank的區(qū)別詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06mybatis類型轉(zhuǎn)換器如何實(shí)現(xiàn)數(shù)據(jù)加解密
這篇文章主要介紹了mybatis類型轉(zhuǎn)換器如何實(shí)現(xiàn)數(shù)據(jù)加解密,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09Java深度優(yōu)先遍歷解決排列組合問(wèn)題詳解
這篇文章主要介紹了Java深度優(yōu)先遍歷解決排列組合問(wèn)題詳解,深度優(yōu)先搜索是遞歸過(guò)程,帶有回退操作,因此需要使用棧存儲(chǔ)訪問(wèn)的路徑信息,當(dāng)訪問(wèn)到的當(dāng)前頂點(diǎn)沒(méi)有可以前進(jìn)的鄰接頂點(diǎn)時(shí),需要進(jìn)行出棧操作,將當(dāng)前位置回退至出棧元素位置,需要的朋友可以參考下2024-01-01java使用@Transactional時(shí)常犯的N種錯(cuò)誤
@Transactional是我們?cè)谟肧pring時(shí)候幾乎逃不掉的一個(gè)注解,本文主要介紹了使用?@Transactional?時(shí)常犯的N種錯(cuò)誤,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01使用okhttp替換Feign默認(rèn)Client的操作
這篇文章主要介紹了使用okhttp替換Feign默認(rèn)Client的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02Centos7安裝JDK1.8詳細(xì)過(guò)程實(shí)戰(zhàn)記錄
這篇文章主要給大家介紹了關(guān)于Centos7安裝JDK1.8的相關(guān)資料,文中通過(guò)圖文以及實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2022-09-09Security框架:如何使用CorsFilter解決前端跨域請(qǐng)求問(wèn)題
這篇文章主要介紹了Security框架:如何使用CorsFilter解決前端跨域請(qǐng)求問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11使用MappingJackson2XmlView實(shí)現(xiàn)JSON到XML的視圖轉(zhuǎn)換
MappingJackson2XmlView來(lái)實(shí)現(xiàn)從JSON到XML格式的響應(yīng)轉(zhuǎn)換,本文將通過(guò)案例,將展示如何將JSON格式的數(shù)據(jù)轉(zhuǎn)換為XML格式,以滿足不同客戶端的數(shù)據(jù)交換需求,需要的朋友可以參考下2024-07-07