spring?Cloud微服務阿里開源TTL身份信息的線程間復用
引言
前面在介紹分布式鏈路追蹤時講過異步調用會丟失鏈路信息,最終的解決方案是使用對應的包裝類重新包裝一下,如下:
RunnableWrapper
CallableWrapper
SupplierWrapper
還有openFeign異步請求丟失上文的問題,這些問題追根究底都是ThreadLocal惹得禍。
由于ThreadLocal只能保存當前線程的信息,不能實現(xiàn)父子線程的繼承。
說到這,很多人想到了InheritableThreadLocal,確實InheritableThreadLocal能夠實現(xiàn)父子線程間傳遞本地變量,但是.....
但是你的程序如果采用線程池,則存在著線程復用的情況,這時就不一定能夠實現(xiàn)父子線程間傳遞了,因為在線程在線程池中的存在不是每次使用都會進行創(chuàng)建,InheritableThreadlocal
是在線程初始化時intertableThreadLocals=true
才會進行拷貝傳遞。
所以若本次使用的子線程是已經(jīng)被池化的線程,從線程池中取出線下進行使用,是沒有經(jīng)過初始化的過程,也就不會進行父子線程的本地變量拷貝。
由于在日常應用場景中,絕大多數(shù)都是會采用線程池的方式進行資源的有效管理。
今天就來聊一聊阿里的ThansmittableThreadLocal是如何解決線程池中父子線程本地變量傳遞。
InheritableThreadLocal 的問題
在介紹ThansmittableThreadLocal
之前先來看一下InheritableThreadLocal
在線程池中的問題,如下代碼:
@Test public?void?test()?throws?Exception?{ ????//單一線程池 ????ExecutorService?executorService?=?Executors.newSingleThreadExecutor(); ????//InheritableThreadLocal存儲 ????InheritableThreadLocal<String>?username?=?new?InheritableThreadLocal<>(); ????for?(int?i?=?0;?i?<?10;?i++)?{ ????username.set("公眾號:腳本之家—"+i); ????Thread.sleep(3000); ????CompletableFuture.runAsync(()->?System.out.println(username.get()),executorService); ???} }
上述代碼中創(chuàng)建了一個單一線程池,循環(huán)異步調用,打印一下username,由于核心線程數(shù)是1,勢必存在線程的復用。
打印信息如下:
公眾號:腳本之家—0
公眾號:腳本之家—0
公眾號:腳本之家—0
公眾號:腳本之家—0
公眾號:腳本之家—0
公眾號:腳本之家—0
公眾號:腳本之家—0
公眾號:腳本之家—0
公眾號:腳本之家—0
公眾號:腳本之家—0
看到了嗎?這里并沒有實現(xiàn)父子線程間的變量傳遞,這也就是InheritableThreadLocal 的局限性。
TransmittableThreadLocal 使用
TransmittableThreadLocal
(TTL
):在使用線程池等會池化復用線程的執(zhí)行組件情況下,提供ThreadLocal
值的傳遞功能,解決異步執(zhí)行時上下文傳遞的問題。
整個TransmittableThreadLocal
庫的核心功能(用戶API
與框架/中間件的集成API
、線程池ExecutorService
/ForkJoinPool
/TimerTask
及其線程工廠的Wrapper
)。
需求場景:
- 分布式跟蹤系統(tǒng) 或 全鏈路壓測(即鏈路打標)
- 日志收集記錄系統(tǒng)上下文
官網(wǎng)地址:https://github.com/alibaba/transmittable-thread-local
下面就以上面的例子改造成TransmittableThreadLocal試一下效果。
首選需要引入對應的依賴,如下:
<dependency> ????<groupId>com.alibaba</groupId> ????<artifactId>transmittable-thread-local</artifactId> </dependency
改造后的代碼如下:
@Test public?void?test()?throws?Exception?{ ????//單一線程池 ????ExecutorService?executorService?=?Executors.newSingleThreadExecutor(); ????//需要使用TtlExecutors對線程池包裝一下 ????executorService=TtlExecutors.getTtlExecutorService(executorService); ????//TransmittableThreadLocal創(chuàng)建 ????TransmittableThreadLocal<String>?username?=?new?TransmittableThreadLocal<>(); ????for?(int?i?=?0;?i?<?10;?i++)?{ ????username.set("公眾號:https://github.com/alibaba/transmittable-thread-local—"+i); ????Thread.sleep(3000); ????CompletableFuture.runAsync(()->?System.out.println(username.get()),executorService); ??} }
需要注意的是需要使用TtlExecutors
對線程池進行包裝,代碼如下:
executorService=TtlExecutors.getTtlExecutorService(executorService);
運行效果如下:
公眾號:腳本之家—0
公眾號:腳本之家—1
公眾號:腳本之家—2
公眾號:腳本之家—3
公眾號:腳本之家—4
公眾號:腳本之家—5
公眾號:腳本之家—6
公眾號:腳本之家—7
公眾號:腳本之家—8
公眾號:腳本之家—9
可以看到已經(jīng)能夠實現(xiàn)了線程池中的父子線程的數(shù)據(jù)傳遞。
在每次調用任務的時,都會將當前的主線程的TTL數(shù)據(jù)copy到子線程里面,執(zhí)行完成后,再清除掉。同時子線程里面的修改回到主線程時其實并沒有生效。這樣可以保證每次任務執(zhí)行的時候都是互不干涉。
簡單應用
在 Spring Security 往往需要存儲用戶登錄的詳細信息,這樣在業(yè)務方法中能夠隨時獲取用戶的信息。
在前面的Spring Cloud Gateway整合OAuth2.0實現(xiàn)統(tǒng)一認證鑒權 文章中筆者是將用戶信息直接存儲在Request中,這樣每次請求都能獲取到對應的信息。
其實Request中的信息存儲也是通過ThreadLocal完成的,在異步執(zhí)行的時候還是需要重新轉存,這樣一來代碼就變得復雜。
那么了解了TransmittableThreadLocal 之后,完全可以使用這個存儲用戶的登錄信息,實現(xiàn)如下:
/** ?*?@description?使用TransmittableThreadLocal存儲用戶身份信息LoginVal ?*/ public?class?SecurityContextHolder?{ ????//使用TTL存儲身份信息 ????private?static?final?TransmittableThreadLocal<LoginVal>?THREAD_LOCAL?=?new?TransmittableThreadLocal<>(); ????public?static?void?set(LoginVal?loginVal){ ????????THREAD_LOCAL.set(loginVal); ????} ????public?static?LoginVal?get(){ ????????return?THREAD_LOCAL.get(); ????} ????public?static?void?remove(){ ????????THREAD_LOCAL.remove(); ????} }
由于mvc中的一次請求對應一個線程,因此只需要在攔截器中的設置和移除TransmittableThreadLocal中的信息,代碼如下:
/** ?*?@description?攔截器,在preHandle中解析請求頭的中的token信息,將其放入SecurityContextHolder中 ?*??????????????????????在afterCompletion方法中移除對應的ThreadLocal中信息 ?*??????????????????????確保每個請求的用戶信息獨立 ?*/ @Component public?class?AuthInterceptor?implements?AsyncHandlerInterceptor?{ ????/** ?????*?在執(zhí)行controller方法之前將請求頭中的token信息解析出來,放入SecurityContextHolder中(TransmittableThreadLocal) ?????*/ ????@Override ????public?boolean?preHandle(HttpServletRequest?request,?HttpServletResponse?response,?Object?handler)?{ ????????if?(!(handler?instanceof?HandlerMethod)) ????????????return?true; ????????//獲取請求頭中的加密的用戶信息 ????????String?token?=?request.getHeader(OAuthConstant.TOKEN_NAME); ????????if?(StrUtil.isBlank(token)) ????????????return?true; ????????//解密 ????????String?json?=?Base64.decodeStr(token); ????????//將json解析成LoginVal ????????LoginVal?loginVal?=?TokenUtils.parseJsonToLoginVal(json); ????????//封裝數(shù)據(jù)到ThreadLocal中 ????????SecurityContextHolder.set(loginVal); ????????return?true; ????} ????/** ?????*?在視圖渲染之后執(zhí)行,意味著一次請求結束,清除TTL中的身份信息 ?????*/ ????@Override ????public?void?afterCompletion(HttpServletRequest?request,?HttpServletResponse?response,?Object?handler,?Exception?ex){ ????????SecurityContextHolder.remove(); ????} }
原理
從定義來看,TransimittableThreadLocal
繼承于InheritableThreadLocal
,并實現(xiàn)TtlCopier
接口,它里面只有一個copy
方法。所以主要是對InheritableThreadLocal
的擴展。
public?class?TransmittableThreadLocal<T>?extends?InheritableThreadLocal<T>?implements?TtlCopier<T>?
在TransimittableThreadLocal
中添加holder
屬性。這個屬性的作用就是被標記為具備線程傳遞資格的對象都會被添加到這個對象中。
要標記一個類,比較容易想到的方式,就是給這個類新增一個Type
字段,還有一個方法就是將具備這種類型的的對象都添加到一個靜態(tài)全局集合中。之后使用時,這個集合里的所有值都具備這個標記。
//?1.?holder本身是一個InheritableThreadLocal對象 //?2.?這個holder對象的value是WeakHashMap<TransmittableThreadLocal<Object>,??> //?? 2.1 WeekHashMap的value總是null,且不可能被使用。 //????2.2?WeekHasshMap支持value=null private?static?InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>,??>>?holder?=?new?InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>,??>>()?{ ??@Override ??protected?WeakHashMap<TransmittableThreadLocal<Object>,??>?initialValue()?{ ????return?new?WeakHashMap<TransmittableThreadLocal<Object>,?Object>(); ??} ? ??/** ???*?重寫了childValue方法,實現(xiàn)上直接將父線程的屬性作為子線程的本地變量對象。 ???*/ ??@Override ??protected?WeakHashMap<TransmittableThreadLocal<Object>,??>?childValue(WeakHashMap<TransmittableThreadLocal<Object>,??>?parentValue)?{ ????return?new?WeakHashMap<TransmittableThreadLocal<Object>,?Object>(parentValue); ??} };
應用代碼是通過TtlExecutors
工具類對線程池對象進行包裝。工具類只是簡單的判斷,輸入的線程池是否已經(jīng)被包裝過、非空校驗等,然后返回包裝類ExecutorServiceTtlWrapper
。根據(jù)不同的線程池類型,有不同和的包裝類。
@Nullable public?static?ExecutorService?getTtlExecutorService(@Nullable?ExecutorService?executorService)?{ ??if?(TtlAgent.isTtlAgentLoaded()?||?executorService?==?null?||?executorService?instanceof?TtlEnhanced)?{ ????return?executorService; ??} ??return?new?ExecutorServiceTtlWrapper(executorService); }
進入包裝類ExecutorServiceTtlWrapper
??梢宰⒁獾讲徽撌峭ㄟ^ExecutorServiceTtlWrapper#submit
方法或者是ExecutorTtlWrapper#execute
方法,都會將線程對象包裝成TtlCallable
或者TtlRunnable
,用于在真正執(zhí)行run
方法前做一些業(yè)務邏輯。
/** ?*?在ExecutorServiceTtlWrapper實現(xiàn)submit方法 ?*/ @NonNull @Override public?<T>?Future<T>?submit(@NonNull?Callable<T>?task)?{ ??return?executorService.submit(TtlCallable.get(task)); } /** ?*?在ExecutorTtlWrapper實現(xiàn)execute方法 ?*/ @Override public?void?execute(@NonNull?Runnable?command)?{ ??executor.execute(TtlRunnable.get(command)); }
所以,重點的核心邏輯應該是在TtlCallable#call()
或者TtlRunnable#run()
中。以下以TtlCallable
為例,TtlRunnable
同理類似。在分析call()
方法之前,先看一個類Transmitter
public?static?class?Transmitter?{ ??/** ????*?捕獲當前線程中的是所有TransimittableThreadLocal和注冊ThreadLocal的值。 ????*/ ??@NonNull ??public?static?Object?capture()?{ ????return?new?Snapshot(captureTtlValues(),?captureThreadLocalValues()); ??} ? ????/** ????*?捕獲TransimittableThreadLocal的值,將holder中的所有值都添加到HashMap后返回。 ????*/ ??private?static?HashMap<TransmittableThreadLocal<Object>,?Object>?captureTtlValues()?{ ????HashMap<TransmittableThreadLocal<Object>,?Object>?ttl2Value?=? ??????new?HashMap<TransmittableThreadLocal<Object>,?Object>(); ????for?(TransmittableThreadLocal<Object>?threadLocal?:?holder.get().keySet())?{ ??????ttl2Value.put(threadLocal,?threadLocal.copyValue()); ????} ????return?ttl2Value; ??} ??/** ????*?捕獲注冊的ThreadLocal的值,也就是原本線程中的ThreadLocal,可以注冊到TTL中,在 ????*?進行線程池本地變量傳遞時也會被傳遞。 ????*/ ??private?static?HashMap<ThreadLocal<Object>,?Object>?captureThreadLocalValues()?{ ????final?HashMap<ThreadLocal<Object>,?Object>?threadLocal2Value?=? ??????new?HashMap<ThreadLocal<Object>,?Object>(); ????for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){ ??????final?ThreadLocal<Object>?threadLocal?=?entry.getKey(); ??????final?TtlCopier<Object>?copier?=?entry.getValue(); ??????threadLocal2Value.put(threadLocal,?copier.copy(threadLocal.get())); ????} ????return?threadLocal2Value; ??} ??/** ????*?將捕獲到的本地變量進行替換子線程的本地變量,并且返回子線程現(xiàn)有的本地變量副本backup。 ????*?用于在執(zhí)行run/call方法之后,將本地變量副本恢復。 ????*/ ??@NonNull ??public?static?Object?replay(@NonNull?Object?captured)?{ ????final?Snapshot?capturedSnapshot?=?(Snapshot)?captured; ????return?new?Snapshot(replayTtlValues(capturedSnapshot.ttl2Value),? ????????????????????????replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); ??} ? ??/** ????*?替換TransmittableThreadLocal ????*/ ??@NonNull ??private?static?HashMap<TransmittableThreadLocal<Object>,?Object>?replayTtlValues(@NonNull?HashMap<TransmittableThreadLocal<Object>,?Object>?captured)?{ ????//?創(chuàng)建副本backup ????HashMap<TransmittableThreadLocal<Object>,?Object>?backup?=? ??????new?HashMap<TransmittableThreadLocal<Object>,?Object>(); ????for?(final?Iterator<TransmittableThreadLocal<Object>>?iterator?=?holder.get().keySet().iterator();?iterator.hasNext();?)?{ ??????TransmittableThreadLocal<Object>?threadLocal?=?iterator.next(); ??????//?對當前線程的本地變量進行副本拷貝 ??????backup.put(threadLocal,?threadLocal.get()); ??????//?若出現(xiàn)調用線程中不存在某個線程變量,而線程池中線程有,則刪除線程池中對應的本地變量 ??????if?(!captured.containsKey(threadLocal))?{ ????????iterator.remove(); ????????threadLocal.superRemove(); ??????} ????} ????//?將捕獲的TTL值打入線程池獲取到的線程TTL中。 ????setTtlValuesTo(captured); ????//?是一個擴展點,調用TTL的beforeExecute方法。默認實現(xiàn)為空 ????doExecuteCallback(true); ????return?backup; ??} ??private?static?HashMap<ThreadLocal<Object>,?Object>?replayThreadLocalValues(@NonNull?HashMap<ThreadLocal<Object>,?Object>?captured)?{ ????final?HashMap<ThreadLocal<Object>,?Object>?backup?=? ??????new?HashMap<ThreadLocal<Object>,?Object>(); ????for?(Map.Entry<ThreadLocal<Object>,?Object>?entry?:?captured.entrySet())?{ ??????final?ThreadLocal<Object>?threadLocal?=?entry.getKey(); ??????backup.put(threadLocal,?threadLocal.get()); ??????final?Object?value?=?entry.getValue(); ??????if?(value?==?threadLocalClearMark)?threadLocal.remove(); ??????else?threadLocal.set(value); ????} ????return?backup; ??} ??/** ????*?清除單線線程的所有TTL和TL,并返回清除之氣的backup ????*/ ??@NonNull ??public?static?Object?clear()?{ ????final?HashMap<TransmittableThreadLocal<Object>,?Object>?ttl2Value?=? ??????new?HashMap<TransmittableThreadLocal<Object>,?Object>(); ????final?HashMap<ThreadLocal<Object>,?Object>?threadLocal2Value?=? ??????new?HashMap<ThreadLocal<Object>,?Object>(); ????for(Map.Entry<ThreadLocal<Object>,TtlCopier<Object>>entry:threadLocalHolder.entrySet()){ ??????final?ThreadLocal<Object>?threadLocal?=?entry.getKey(); ??????threadLocal2Value.put(threadLocal,?threadLocalClearMark); ????} ????return?replay(new?Snapshot(ttl2Value,?threadLocal2Value)); ??} ??/** ????*?還原 ????*/ ??public?static?void?restore(@NonNull?Object?backup)?{ ????final?Snapshot?backupSnapshot?=?(Snapshot)?backup; ????restoreTtlValues(backupSnapshot.ttl2Value); ????restoreThreadLocalValues(backupSnapshot.threadLocal2Value); ??} ??private?static?void?restoreTtlValues(@NonNull?HashMap<TransmittableThreadLocal<Object>,?Object>?backup)?{ ????//?擴展點,調用TTL的afterExecute ????doExecuteCallback(false); ????for?(final?Iterator<TransmittableThreadLocal<Object>>?iterator?=?holder.get().keySet().iterator();?iterator.hasNext();?)?{ ??????TransmittableThreadLocal<Object>?threadLocal?=?iterator.next(); ??????if?(!backup.containsKey(threadLocal))?{ ????????iterator.remove(); ????????threadLocal.superRemove(); ??????} ????} ????//?將本地變量恢復成備份版本 ????setTtlValuesTo(backup); ??} ??private?static?void?setTtlValuesTo(@NonNull?HashMap<TransmittableThreadLocal<Object>,?Object>?ttlValues)?{ ????for?(Map.Entry<TransmittableThreadLocal<Object>,?Object>?entry?:?ttlValues.entrySet())?{ ??????TransmittableThreadLocal<Object>?threadLocal?=?entry.getKey(); ??????threadLocal.set(entry.getValue()); ????} ??} ??private?static?void?restoreThreadLocalValues(@NonNull?HashMap<ThreadLocal<Object>,?Object>?backup)?{ ????for?(Map.Entry<ThreadLocal<Object>,?Object>?entry?:?backup.entrySet())?{ ??????final?ThreadLocal<Object>?threadLocal?=?entry.getKey(); ??????threadLocal.set(entry.getValue()); ????} ??} ??/** ???*?快照類,保存TTL和TL ???*/ ??private?static?class?Snapshot?{ ????final?HashMap<TransmittableThreadLocal<Object>,?Object>?ttl2Value; ????final?HashMap<ThreadLocal<Object>,?Object>?threadLocal2Value; ????private?Snapshot(HashMap<TransmittableThreadLocal<Object>,?Object>?ttl2Value, ?????????????????????HashMap<ThreadLocal<Object>,?Object>?threadLocal2Value)?{ ??????this.ttl2Value?=?ttl2Value; ??????this.threadLocal2Value?=?threadLocal2Value; ????} ??}
進入TtlCallable#call()
方法。
@Override public?V?call()?throws?Exception?{ ??Object?captured?=?capturedRef.get(); ??if?(captured?==?null?||?releaseTtlValueReferenceAfterCall?&&? ??????!capturedRef.compareAndSet(captured,?null))?{ ????throw?new?IllegalStateException("TTL?value?reference?is?released?after?call!"); ??} ??//?調用replay方法將捕獲到的當前線程的本地變量,傳遞給線程池線程的本地變量, ??//?并且獲取到線程池線程覆蓋之前的本地變量副本。 ??Object?backup?=?replay(captured); ??try?{ ????//?線程方法調用 ????return?callable.call(); ??}?finally?{ ????//?使用副本進行恢復。 ????restore(backup); ??} }
到這基本上線程池方式傳遞本地變量的核心代碼已經(jīng)大概看完了??偟膩碚f在創(chuàng)建TtlCallable
對象是,調用capture()
方法捕獲調用方的本地線程變量,在call()
執(zhí)行時,將捕獲到的線程變量,替換到線程池所對應獲取到的線程的本地變量中,并且在執(zhí)行完成之后,將其本地變量恢復到調用之前。
總結
本文介紹了使用阿里開源的TransmittableThreadLocal 優(yōu)雅的實現(xiàn)父子線程的數(shù)據(jù)傳遞,應用場景很多,企業(yè)中應用也比較廣泛。
- 基于jib-maven-plugin插件快速構建微服務docker鏡像的方法
- 微服務鏈路追蹤Spring Cloud Sleuth整合Zipkin解析
- Java微服務Filter過濾器集成Sentinel實現(xiàn)網(wǎng)關限流過程詳解
- Java微服務分布式調度Elastic-job環(huán)境搭建及配置
- Java微服務Nacos Config配置中心超詳細講解
- SpringCloud微服務中跨域配置的方法詳解
- Java Feign微服務接口調用方法詳細講解
- go微服務PolarisMesh源碼解析服務端啟動流程
- 微服務Spring Boot 整合 Redis 實現(xiàn)UV 數(shù)據(jù)統(tǒng)計的詳細過程
- go-micro微服務JWT跨域認證問題
- 詳解go-micro微服務consul配置及注冊中心
- go-micro微服務domain層開發(fā)示例詳解
- 微服務?Spring?Boot?整合?Redis?BitMap?實現(xiàn)?簽到與統(tǒng)計功能
- 一文帶你了解微服務架構中的"發(fā)件箱模式"
- go?micro微服務框架項目搭建方法
- go?micro微服務proto開發(fā)安裝及使用規(guī)則
- Mybatis與微服務注冊的詳細過程
- 簡單介紹一下什么是microservice微服務
相關文章
spring cloud 集成 ribbon負載均衡的實例代碼
spring Cloud Ribbon 是一個客戶端的負載均衡器,它提供對大量的HTTP和TCP客戶端的訪問控制。本文給大家介紹spring cloud 集成 ribbon負載均衡,感興趣的朋友跟隨小編一起看看吧2021-11-11Maven中plugins與pluginManagement的區(qū)別說明
這篇文章主要介紹了Maven中plugins與pluginManagement的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09Java如何生成4位、6位隨機數(shù)短信驗證碼(高效實現(xiàn))
這篇文章主要介紹了Java如何生成4位、6位隨機數(shù)短信驗證碼(高效實現(xiàn)),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-12-12springboot+springsecurity如何實現(xiàn)動態(tài)url細粒度權限認證
這篇文章主要介紹了springboot+springsecurity如何實現(xiàn)動態(tài)url細粒度權限認證的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06