Java線程之間數(shù)據(jù)傳遞的實現(xiàn)示例(4種)
前言
在業(yè)務(wù)系統(tǒng)的開發(fā)過程中,我們經(jīng)常會遇到父子線程數(shù)據(jù)傳遞(非調(diào)用參數(shù))的場景,如:登陸信息,調(diào)用者信息,TraceId的傳遞等業(yè)務(wù)場景,固總結(jié)4中方式進行線程之間數(shù)據(jù)傳遞。
ThreadLocal
代碼如下:
public class TtlParameterWrapper { ? ? private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>(); ? ? private TtlParameterWrapper() { ? ? } ? ? public static String getCaller() { ? ? ? ? return THREAD_LOCAL.get(); ? ? } ? ? public static void setCaller(String caller) { ? ? ? ?THREAD_LOCAL.set(caller); ? ? } ? ? public static void clear() { ? ? ? ?THREAD_LOCAL.remove(); ? ? } }
那么子線程想要獲取這個TtlParameterWrapper如何做呢?
- 獲取父線程的TtlParameterWrapper
- 將TtlParameterWrapper設(shè)置到子線程,達到復(fù)用
public void handler(){ // 1. 獲取父線程 TtlParameterWrapper.setCaller("caller path"); log.info("父線程的值 ->{}",TtlParameterWrapper.get()); CompletableFuture.runAsync(()->{ // 2. 設(shè)置子線程的值,復(fù)用 TtlParameterWrapper.setCaller("caller path"); log.info("子線程的值 ->{}", TtlParameterWrapper.getCaller()); }); }
總結(jié)
雖然最終達成了傳遞的目的,但是每次開異步線程都需要手動設(shè)置,代碼冗余繁雜,如果不這樣設(shè)置則無法跨線程進行傳遞;如果手動設(shè)置,將無法進行線程間進行傳遞,因為TheadLocal中的數(shù)據(jù)無法進行線程間進行傳遞。
InheritableThreadLocal
這種方案不建議使用,InheritableThreadLocal雖然能夠?qū)崿F(xiàn)父子線程間的復(fù)用,但是在線程池中使用會存在失敗的問題,原因:InheritableThreadLocal 在父線程創(chuàng)建子線程的時候,會將父線程中InheritableThreadLocal中存儲的數(shù)據(jù) 拷貝一份存儲到子線程的 InheritableThreadLocal中,但是在web的容器中使用了線程池,線程會被創(chuàng)建回收重復(fù)的利用,不會被銷毀重新創(chuàng)建,所以會存在實效的場景。
這種方案使用也是非常簡單,直接用InheritableThreadLocal替換ThreadLocal即可。
代碼如下:
public class TtlParameterWrapper { ? ? private static ?final ?InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>(); ? ? public static String getCaller(){ ? ? ? ? return inheritableThreadLocal.get(); ? ? } ? ? public static void setCaller(LoginVal loginVal){ ? ? ? ?inheritableThreadLocal.set(loginVal); ? ? } ? ? public static void clear(){ ? ? ? ?inheritableThreadLocal.remove(); ? ? } }
TransmittableThreadLocal
TransmittableThreadLocal是阿里開源的工具,解決了InheritableThreadLocal不能進行線程池間傳遞數(shù)據(jù)的缺陷,在使用線程池等會池化復(fù)用線程的執(zhí)行組件情況下,提供ThreadLocal值的傳遞功能,解決異步執(zhí)行時上下文傳遞的問題。
添加依賴
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.14.2</version> </dependency>
TtlParameterWrapper改造
public class TtlParameterWrapper { private static final TransmittableThreadLocal<String> TRANSMITTABLE_THREAD_LOCAL = new TransmittableThreadLocal<>(); private TtlParameterWrapper() { } public static String getCaller() { return TRANSMITTABLE_THREAD_LOCAL.get(); } public static void setCaller(String caller) { TRANSMITTABLE_THREAD_LOCAL.set(caller); } public static void clear() { TRANSMITTABLE_THREAD_LOCAL.remove(); } }
原理
從定義來看,TransimittableThreadLocal繼承于InheritableThreadLocal,并實現(xiàn)TtlCopier接口,它里面只有一個copy方法。所以主要是對InheritableThreadLocal的擴展。
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T>
在TransimittableThreadLocal中添加holder屬性。這個屬性的作用就是被標(biāo)記為具備線程傳遞資格的對象都會被添加到這個對象中。
要標(biāo)記一個類,比較容易想到的方式,就是給這個類新增一個Type字段,還有一個方法就是將具備這種類型的的對象都添加到一個靜態(tài)全局集合中。之后使用時,這個集合里的所有值都具備這個標(biāo)記。
//1. holder本身是一個InheritableThreadLocal對象 //2. 這個holder對象的value是WeakHashMap<TransmittableThreadLocal<Object>,?> // ? 2.1WeekHashMap的value總是null,且不可能被使用。 // ? ?2.2WeekHasshMap支持value=null private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>,?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>,?>>(){ @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() { ?? ?return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); } /** ?* 重寫了childValue方法,實現(xiàn)上直接將父線程的屬性作為子線程的本地變量對象。 ?? ?*/ @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?>parentValue){ ?? ?return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue); }};
應(yīng)用代碼是通過TtlExecutors工具類對線程池對象進行包裝。工具類只是簡單的判斷,輸入的線程池是否已經(jīng)被包裝過、非空校驗等,然后返回包裝類ExecutorServiceTtlWrapper。根據(jù)不同的線程池類型,有不同和的包裝類。
@Nullable public static ExecutorServicegetTtlExecutorService(@Nullable ExecutorService executorService){ if (TtlAgent.isTtlAgentLoaded() || executorService == null || executorService instanceof TtlEnhanced){ return executorService; } return new ExecutorServiceTtlWrapper(executorService); }
進入包裝類ExecutorServiceTtlWrapper。可以注意到不論是通過ExecutorServiceTtlWrapper#submit方法或者是ExecutorTtlWrapper#execute方法,都會將線程對象包裝成TtlCallable或者TtlRunnable,用于在真正執(zhí)行run方法前做一些業(yè)務(wù)邏輯。
/** * 在ExecutorServiceTtlWrapper實現(xiàn)submit方法 */ @NonNull @Override public <T> Future<T> submit(@NonNull Callable<T> task){ return executorService.submit(TtlCallable.get(task)); } /** * 在ExecutorTtlWrapper實現(xiàn)execute方法 */ @Override public void execute(@NonNull Runnable command){ executor.execute(TtlRunnable.get(command)); }
重點的核心邏輯應(yīng)該是在TtlCallable#call()或者TtlRunnable#run()中。以下以TtlCallable為例,TtlRunnable同理類似。在分析call()方法之前,先看一個類Transmitter。
public static class Transmitter { /** * 捕獲當(dāng)前線程中的是所有TransimittableThreadLocal和注冊ThreadLocal的值。 */ @NonNull public static Object capture(){ return new Snapshot(captureTtlValues(), captureThreadLocalValues()); } /** * 捕獲TransimittableThreadLocal的值,將holder中的所有值都添加到HashMap后返回。 */ private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() { HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap<TransmittableThreadLocal<Object>, Object>(); for(TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } return ttl2Value; } /** * 捕獲注冊的ThreadLocal的值,也就是原本線程中的ThreadLocal,可以注冊到TTL中,在 * 進行線程池本地變量傳遞時也會被傳遞。 */ private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() { final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap<ThreadLocal<Object>, Object>(); for(Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); final TtlCopier<Object> copier = entry.getValue(); threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get())); } return threadLocal2Value; } /** * 將捕獲到的本地變量進行替換子線程的本地變量,并且返回子線程現(xiàn)有的本地變量副本backup。 * 用于在執(zhí)行run/call方法之后,將本地變量副本恢復(fù)。 */ @NonNull public static Object replay(@NonNull Object captured) { final Snapshot capturedSnapshot = (Snapshot) captured; return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); } /** * 替換TransmittableThreadLocal */ @NonNull private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>,Object> captured) { // 創(chuàng)建副本backup HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap<TransmittableThreadLocal<Object>, Object>(); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext();) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); // 對當(dāng)前線程的本地變量進行副本拷貝 backup.put(threadLocal, threadLocal.get()); // 若出現(xiàn)調(diào)用線程中不存在某個線程變量,而線程池中線程有,則刪除線程池中對應(yīng)的本地變量 if (!captured.containsKey(threadLocal)){ iterator.remove(); threadLocal.superRemove(); } } // 將捕獲的TTL值打入線程池獲取到的線程TTL中。 setTtlValuesTo(captured); // 是一個擴展點,調(diào)用TTL的beforeExecute方法。默認(rèn)實現(xiàn)為空 doExecuteCallback(true); return backup; } private static HashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> captured) { final HashMap<ThreadLocal<Object>, Object> backup = new HashMap<ThreadLocal<Object>, Object>(); for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); backup.put(threadLocal, threadLocal.get()); final Objectvalue = entry.getValue(); if (value == threadLocalClearMark) threadLocal.remove(); else threadLocal.set(value); } return backup; } /** * 清除單線線程的所有TTL和TL,并返回清除之氣的backup */ @NonNull public static Object clear() { final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap<TransmittableThreadLocal<Object>, Object>(); final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap<ThreadLocal<Object>, Object>(); for (Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry : threadLocalHolder.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); threadLocal2Value.put(threadLocal, threadLocalClearMark); } return replay(new Snapshot(ttl2Value, threadLocal2Value)); } /** * 還原 */ public static void restore(@NonNull Object backup) { final Snapshot backupSnapshot = (Snapshot) backup; restoreTtlValues(backupSnapshot.ttl2Value); restoreThreadLocalValues(backupSnapshot.threadLocal2Value); } private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>,Object> backup) { // 擴展點,調(diào)用TTL的afterExecute doExecuteCallback(false); for (finalIterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext();) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); if (!backup.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 將本地變量恢復(fù)成備份版本 setTtlValuesTo(backup); } private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>,Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry :ttlValues.entrySet()) { TransmittableThreadLocal<Object> threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); } } private staticvoid restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>,Object> backup) { for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); threadLocal.set(entry.getValue()); } } /** * 快照類,保存TTL和TL */ private static class Snapshot { final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value; final HashMap<ThreadLocal<Object>, Object> threadLocal2Value; private Snapshot(HashMap<TransmittableThreadLocal<Object>, Object>ttl2Value, HashMap<ThreadLocal<Object>, Object> threadLocal2Value) { this.ttl2Value= ttl2Value; this.threadLocal2Value= threadLocal2Value; } } // 進入TtlCallable#call()方法。 @Override public V call() throws Exception { Object captured = capturedRef.get(); if (captured == null|| releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTLvalue reference is released after call!"); } // 調(diào)用replay方法將捕獲到的當(dāng)前線程的本地變量,傳遞給線程池線程的本地變量, // 并且獲取到線程池線程覆蓋之前的本地變量副本。 Object backup = replay(captured); try { // 線程方法調(diào)用 return callable.call(); } finally { // 使用副本進行恢復(fù)。 restore(backup); } } }
線程池方式傳遞本地變量的核心代碼已經(jīng)完畢。總的來說在創(chuàng)建TtlCallable對象是,調(diào)用capture()方法捕獲調(diào)用方的本地線程變量,在call()執(zhí)行時,將捕獲到的線程變量,替換到線程池所對應(yīng)獲取到的線程的本地變量中,并且在執(zhí)行完成之后,將其本地變量恢復(fù)到調(diào)用之前。
TaskDecorator
線程池設(shè)置TaskDecorator,TaskDecorator是什么?
官方釋義:這是一個執(zhí)行回調(diào)方法的裝飾器,主要應(yīng)用于傳遞上下文,或者提供任務(wù)的監(jiān)控/統(tǒng)計信息。
代碼如下
public class ContextTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable){ //獲取父線程的值 String callerPath = TtlParameterWrapper.getCaller(); return () -> { try { // 將主線程的請求信息,設(shè)置到子線程中 TtlParameterWrapper.setCaller(callerPath); // 執(zhí)行子線程,這一步不要忘了 runnable.run(); } finally { // 線程結(jié)束,清空這些信息,否則可能造成內(nèi)存泄漏 TtlParameterWrapper.clear(); } }; } }
TaskDecorator需要結(jié)合線程池使用,實際開發(fā)中異步線程建議使用線程池,只需要在對應(yīng)的線程池配置一下
代碼
@Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor(); poolTaskExecutor.setCorePoolSize(xx); poolTaskExecutor.setMaxPoolSize(xx); // 設(shè)置線程活躍時間(秒) poolTaskExecutor.setKeepAliveSeconds(xx); // 設(shè)置隊列容量 poolTaskExecutor.setQueueCapacity(xx); //設(shè)置TaskDecorator,用于解決父子線程間的數(shù)據(jù)復(fù)用 poolTaskExecutor.setTaskDecorator(new ContextTaskDecorator()); poolTaskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任務(wù)結(jié)束后再關(guān)閉線程池 poolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); return poolTaskExecutor; }
此時業(yè)務(wù)代碼就不需要去設(shè)置子線程的值,直接使用即可
代碼
public void handlerAsync() { log.info("父線程的用戶信息 -> {}", TtlParameterWrapper.get()); //執(zhí)行異步任務(wù),需要指定的線程池 CompletableFuture.runAsync(() -> log.info("子線程的用戶信息 -> {}", TtlParameterWrapper.get() ),taskExecutor); }
這里使用的是CompletableFuture執(zhí)行異步任務(wù),使用@Async這個注解同樣是可行的。
注意:無論使用何種方式,都需要指定線程池
到此這篇關(guān)于Java線程之間數(shù)據(jù)傳遞的實現(xiàn)示例的文章就介紹到這了,更多相關(guān)Java線程之間數(shù)據(jù)傳遞內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JDK源碼之線程并發(fā)協(xié)調(diào)神器CountDownLatch和CyclicBarrier詳解
我一直認(rèn)為程序是對于現(xiàn)實世界的邏輯描述,而在現(xiàn)實世界中很多事情都需要各方協(xié)調(diào)合作才能完成,就好比完成一個平臺的交付不可能只靠一個人,而需要研發(fā)、測試、產(chǎn)品以及項目經(jīng)理等不同角色人員進行通力合作才能完成最終的交付2022-02-02使用HandlerMethodArgumentResolver用于統(tǒng)一獲取當(dāng)前登錄用戶
這篇文章主要介紹了使用HandlerMethodArgumentResolver用于統(tǒng)一獲取當(dāng)前登錄用戶實例,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-12-12SpringBoot?SpringSecurity?詳細(xì)介紹(基于內(nèi)存的驗證)
這篇文章主要介紹了SpringBoot?SpringSecurity?介紹(基于內(nèi)存的驗證),本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-04-04SpringBoot+JWT實現(xiàn)單點登錄完美解決方案
單點登錄是一種統(tǒng)一認(rèn)證和授權(quán)機制,指在多個應(yīng)用系統(tǒng)中,用戶只需要登錄一次就可以訪問所有相互信任的系統(tǒng),不需要重新登錄驗證,這篇文章主要介紹了SpringBoot+JWT實現(xiàn)單點登錄解決方案,需要的朋友可以參考下2023-07-07