利用ThreadLocal實現一個上下文管理組件
本文基于ThreadLocal
原理,實現了一個上下文狀態(tài)管理組件Scope
,通過開啟一個自定義的Scope
,在Scope
范圍內,可以通過Scope
各個方法讀寫數據;
通過自定義線程池實現上下文狀態(tài)數據的線程間傳遞;
提出了一種基于Filter
和Scope
的Request
粒度的上下文管理方案。
github:https://github.com/pengchengSU/demo-request-scope
1 ThreadLocal原理
ThreadLocal
主要作用就是實現線程間變量隔離,對于一個變量,每個線程維護一個自己的實例,防止多線程環(huán)境下的資源競爭,那ThreadLocal
是如何實現這一特性的呢?
圖1
從上圖可知:
- 每個
Thread
對象中都包含一個ThreadLocal.ThreadLocalMap
類型的threadlocals
成員變量; - 該map對應的每個元素
Entry
對象中:key是ThreadLocal
對象的弱引用,value是該threadlocal
變量在當前線程中的對應的變量實體; - 當某一線程執(zhí)行獲取該
ThreadLocal
對象對應的變量時,首先從當前線程對象中獲取對應的threadlocals
哈希表,再以該ThreadLocal
對象為key查詢哈希表中對應的value; - 由于每個線程獨占一個
threadlocals
哈希表,因此線程間ThreadLocal
對象對應的變量實體也是獨占的,不存在競爭問題,也就避免了多線程問題。
有人可能會問:ThreadLocalMap
是Thread
成員變量(非public,只有包訪問權限,Thread和Threadlocal都在java.lang 包下,Thread可以訪問ThreadLocal.ThreadLocalMap),定義卻在ThreadLocal中,為什么要這么設計?
源碼的注釋給出了解釋:ThreadLocalMap
就是維護線程本地變量設計的,就是讓使用者知道ThreadLocalMap
就只做保存線程局部變量這一件事。
set() 方法
public void set(T value) { Thread t = Thread.currentThread(); //獲取當前線程 ThreadLocalMap map = getMap(t); //從當前線程對象中獲取threadlocals,該map保存了所用的變量實例 if (map != null) { map.set(this, value); } else { createMap(t, value); //初始threadlocals,并設置當前變量 } }
ThreadLocalMap getMap(Thread t) { return t.threadLocals; }
void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
get() 方法
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); //從當前線程對象中獲取threadlocals,該map保存了所用的變量實體 if (map != null) { // 獲取當前threadlocal對象對應的變量實體 ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } // 如果map沒有初始化,那么在這里初始化一下 return setInitialValue(); }
withInitial()方法
由于通過 ThreadLocal
的 set()
設置的值,只會設置當前線程對應變量實體,無法實現統(tǒng)一初始化所有線程的ThreadLocal
的值。ThreadLocal
提供了一個 withInitial()
方法實現這一功能:
ThreadLocal<String> initValue = ThreadLocal.withInitial(() -> "initValue");
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) { // 返回SuppliedThreadLocal類型對象 return new SuppliedThreadLocal<>(supplier); }
static final class SuppliedThreadLocal<T> extends ThreadLocal<T> { private final Supplier<? extends T> supplier; SuppliedThreadLocal(Supplier<? extends T> supplier) { this.supplier = Objects.requireNonNull(supplier); } @Override protected T initialValue() { // 獲取初始化值 return supplier.get(); } }
ThreadLocal中的內存泄漏問題
由圖1可知,ThreadLocal.ThreadLocalMap
對應的Entry
中,key為ThreadLocal
對象的弱引用,方法執(zhí)行對應棧幀中的ThreadLocal
引用為強引用。當方法執(zhí)行過程中,由于棧幀銷毀或者主動釋放等原因,釋放了ThreadLocal
對象的強引用,即表示該ThreadLocal
對象可以被回收了。又因為Entry
中key為ThreadLocal
對象的弱引用,所以當jvm執(zhí)行GC操作時是能夠回收該ThreadLocal
對象的。
而Entry
中value對應的是變量實體對象的強引用,因此釋放一個ThreadLocal
對象,是無法釋放ThreadLocal.ThreadLocalMap
中對應的value對象的,也就造成了內存泄漏。除非釋放當前線程對象,這樣整個threadlocals
都被回收了。但是日常開發(fā)中會經常使用線程池等線程池化技術,釋放線程對象的條件往往無法達到。
JDK處理的方法是,在ThreadLocalMap
進行set()
、get()
、remove()
的時候,都會進行清理:
private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); }
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) //如果key為null,對應的threadlocal對象已經被回收,清理該Entry expungeStaleEntry(i); else i = nextIndex(i, len); e = tab[i]; } return null; }
2 自定義上下文Scope
在工作中,我們經常需要維護一些上下文,這樣可以避免在方法調用過程中傳入過多的參數,需要查詢/修改一些數據的時候,直接在當前上下文中操作就行了。舉個具體點的例子:當web服務器收到一個請求時,需要解析當前登錄態(tài)的用戶,在后續(xù)的業(yè)務執(zhí)行流程中都需要這個用戶名。
如果只需要維護一個上下文狀態(tài)數據還比較好處理,可以通過方法傳參的形式,執(zhí)行每個業(yè)務方法的時候都通過添加一個表示用戶名方法參數傳遞進去,但是如果需要維護上下文狀態(tài)數據比較多的話,這個方式就不太優(yōu)雅了。
一個可行的方案是通過Threadlocal
實現請求線程的上下文,只要是同一線程的執(zhí)行過程,不同方法間不傳遞上下文狀態(tài)變量,直接操作ThreadLocal
對象實現狀態(tài)數據的讀寫。當存在多個上下文狀態(tài)的話,則需要維護多個ThreadLocal
,似乎也可以勉強接受。但是當遇到業(yè)務流程中使用線程池的情況下,從Tomcat傳遞這些ThreadLocal
到線程池中的線程里就變的比較麻煩了。
基于以上考慮,下面介紹一種基于Threadlocal
實現的上下文管理組件Scope
:
Scope.java
public class Scope { // 靜態(tài)變量,維護不同線程的上下文Scope private static final ThreadLocal<Scope> SCOPE_THREAD_LOCAL = new ThreadLocal<>(); // 實例變量,維護每個上下文中所有的狀態(tài)數據,為了區(qū)分不同的狀態(tài)數據,使用ScopeKey類型的實例作為key private final ConcurrentMap<ScopeKey<?>, Object> values = new ConcurrentHashMap<>(); // 獲取當前上下文 public static Scope getCurrentScope() { return SCOPE_THREAD_LOCAL.get(); } // 在當前上下文設置一個狀態(tài)數據 public <T> void set(ScopeKey<T> key, T value) { if (value != null) { values.put(key, value); } else { values.remove(key); } } // 在當前上下文讀取一個狀態(tài)數據 public <T> T get(ScopeKey<T> key) { T value = (T) values.get(key); if (value == null && key.initializer() != null) { value = key.initializer().get(); } return value; } // 開啟一個上下文 public static Scope beginScope() { Scope scope = SCOPE_THREAD_LOCAL.get(); if (scope != null) { throw new IllegalStateException("start a scope in an exist scope."); } scope = new Scope(); SCOPE_THREAD_LOCAL.set(scope); return scope; } // 關閉當前上下文 public static void endScope() { SCOPE_THREAD_LOCAL.remove(); } }
ScopeKey.java
public final class ScopeKey<T> { // 初始化器,參考 ThreadLocal 的 withInitial() private final Supplier<T> initializer; public ScopeKey() { this(null); } public ScopeKey(Supplier<T> initializer) { this.initializer = initializer; } // 統(tǒng)一初始化所有線程的 ScopeKey 對應的值,參考 ThreadLocal 的 withInitial() public static <T> ScopeKey<T> withInitial(Supplier<T> initializer) { return new ScopeKey<>(initializer); } public Supplier<T> initializer() { return this.initializer; } // 獲取當前上下文中 ScopeKey 對應的變量 public T get() { Scope currentScope = getCurrentScope(); return currentScope.get(this); } // 設置當前上下文中 ScopeKey 對應的變量 public boolean set(T value) { Scope currentScope = getCurrentScope(); if (currentScope != null) { currentScope.set(this, value); return true; } else { return false; } } }
使用方式
@Test public void testScopeKey() { ScopeKey<String> localThreadName = new ScopeKey<>(); // 不同線程中執(zhí)行時,開啟獨占的 Scope Runnable r = () -> { // 開啟 Scope Scope.beginScope(); try { String currentThreadName = Thread.currentThread().getName(); localThreadName.set(currentThreadName); log.info("currentThread: {}", localThreadName.get()); } finally { // 關閉 Scope Scope.endScope(); } }; new Thread(r, "thread-1").start(); new Thread(r, "thread-2").start(); /** 執(zhí)行結果 * [thread-1] INFO com.example.demo.testscope.TestScope - currentThread: thread-1 * [thread-2] INFO com.example.demo.testscope.TestScope - currentThread: thread-2 */ } @Test public void testWithInitial() { ScopeKey<String> initValue = ScopeKey.withInitial(() -> "initVal"); Runnable r = () -> { Scope.beginScope(); try { log.info("initValue: {}", initValue.get()); } finally { Scope.endScope(); } }; new Thread(r, "thread-1").start(); new Thread(r, "thread-2").start(); /** 執(zhí)行結果 * [thread-1] INFO com.example.demo.testscope.TestScope - initValue: initVal * [thread-2] INFO com.example.demo.testscope.TestScope - initValue: initVal */ }
上面的測試用例中在代碼中手動開啟和關閉Scope
不太優(yōu)雅,可以在Scope
中添加兩個個靜態(tài)方法包裝下Runnable
和Supplier
接口:
public static <X extends Throwable> void runWithNewScope(@Nonnull ThrowableRunnable<X> runnable) throws X { supplyWithNewScope(() -> { runnable.run(); return null; }); } public static <T, X extends Throwable> T supplyWithNewScope(@Nonnull ThrowableSupplier<T, X> supplier) throws X { beginScope(); try { return supplier.get(); } finally { endScope(); } }
@FunctionalInterface public interface ThrowableRunnable<X extends Throwable> { void run() throws X; } public interface ThrowableSupplier<T, X extends Throwable> { T get() throws X; }
以新的Scope
執(zhí)行,可以這樣寫:
@Test public void testRunWithNewScope() { ScopeKey<String> localThreadName = new ScopeKey<>(); ThrowableRunnable r = () -> { String currentThreadName = Thread.currentThread().getName(); localThreadName.set(currentThreadName); log.info("currentThread: {}", localThreadName.get()); }; // 不同線程中執(zhí)行時,開啟獨占的 Scope new Thread(() -> Scope.runWithNewScope(r), "thread-1").start(); new Thread(() -> Scope.runWithNewScope(r), "thread-2").start(); /** 執(zhí)行結果 * [thread-2] INFO com.example.demo.TestScope.testscope - currentThread: thread-2 * [thread-1] INFO com.example.demo.TestScope.testscope - currentThread: thread-1 */ }
3 在線程池中傳遞Scope
在上一節(jié)中實現的Scope
,通過ThreadLocal
實現了了一個自定義的上下文組件,在同一個線程中通過ScopeKey.set()
/ ScopeKey.get()
讀寫同一個上下文中的狀態(tài)數據。
現在需要實現這樣一個功能,在一個線程執(zhí)行過程中開啟了一個Scope
,隨后使用線程池執(zhí)行任務,要求在線程池中也能獲取當前Scope
中的狀態(tài)數據。典型的使用場景是:服務收到一個用戶請求,通過Scope
將登陸態(tài)數據存到當前線程的上下文中,隨后使用線程池執(zhí)行一些耗時的操作,希望線程池中的線程也能拿到Scope
中的登陸態(tài)數據。
由于線程池中的線程和請求線程不是一個線程,按照目前的實現,線程池中的線程是無法拿到請求線程上下文中的數據的。
解決方法是,在提交runnable
時,將當前的Scope
引用存到runnable
對象中,當獲得線程執(zhí)行時,將Scope
替換到執(zhí)行線程中,執(zhí)行完成后,再恢復現場。在Scope
中新增如下靜態(tài)方法:
// 以給定的上下文執(zhí)行 Runnable public static <X extends Throwable> void runWithExistScope(Scope scope, ThrowableRunnable<X> runnable) throws X { supplyWithExistScope(scope, () -> { runnable.run(); return null; }); } // 以給定的上下文執(zhí)行 Supplier public static <T, X extends Throwable> T supplyWithExistScope(Scope scope, ThrowableSupplier<T, X> supplier) throws X { // 保留現場 Scope oldScope = SCOPE_THREAD_LOCAL.get(); // 替換成外部傳入的 Scope SCOPE_THREAD_LOCAL.set(scope); try { return supplier.get(); } finally { if (oldScope != null) { // 恢復線程 SCOPE_THREAD_LOCAL.set(oldScope); } else { SCOPE_THREAD_LOCAL.remove(); } } }
實現支持Scope
切換的自定義線程池ScopeThreadPoolExecutor
:
public class ScopeThreadPoolExecutor extends ThreadPoolExecutor { ScopeThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public static ScopeThreadPoolExecutor newFixedThreadPool(int nThreads) { return new ScopeThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } /** * 只要override這一個方法就可以 * 所有submit, invokeAll等方法都會代理到這里來 */ @Override public void execute(Runnable command) { Scope scope = getCurrentScope(); // 提交任務時,把執(zhí)行 execute 方法的線程中的 Scope 傳進去 super.execute(() -> runWithExistScope(scope, command::run)); } }
測試下ScopeThreadPoolExecutor
是否生效:
@Test public void testScopeThreadPoolExecutor() { ScopeKey<String> localVariable = new ScopeKey<>(); Scope.beginScope(); try { localVariable.set("value out of thread pool"); Runnable r = () -> log.info("localVariable in thread pool: {}", localVariable.get()); // 使用線程池執(zhí)行,能獲取到外部Scope中的數據 ExecutorService executor = ScopeThreadPoolExecutor.newFixedThreadPool(10); executor.execute(r); executor.submit(r); } finally { Scope.endScope(); } /** 執(zhí)行結果 * [pool-1-thread-1] INFO com.example.demo.testscope.TestScope - localVariable in thread pool: value out of thread pool * [pool-1-thread-2] INFO com.example.demo.testscope.TestScope - localVariable in thread pool: value out of thread pool */ } @Test public void testScopeThreadPoolExecutor2() { ScopeKey<String> localVariable = new ScopeKey<>(); Scope.runWithNewScope(() -> { localVariable.set("value out of thread pool"); Runnable r = () -> log.info("localVariable in thread pool: {}", localVariable.get()); // 使用線程池執(zhí)行,能獲取到外部Scope中的數據 ExecutorService executor = ScopeThreadPoolExecutor.newFixedThreadPool(10); executor.execute(r); executor.submit(r); }); /** 執(zhí)行結果 * [pool-1-thread-2] INFO com.example.demo.testscope.TestScope - localVariable in thread pool: value out of thread pool * [pool-1-thread-1] INFO com.example.demo.testscope.TestScope - localVariable in thread pool: value out of thread pool */ }
以上兩個測試用例,分別通過手動開啟Scope
、借助runWithNewScope
工具方法自動開啟Scope
兩種方式驗證了自定義線程池ScopeThreadPoolExecutor
的Scope
可傳遞性。
4 通過Filter、Scope實現Request上下文
接下來介紹如何通過Filter
和Scope
實現Request
粒度的Scope
上下文。思路是:通過注入一個攔截器,在進入攔截器后開啟Scope
作為一個請求的上下文,解析Request
對象獲取獲取相關狀態(tài)信息(如登陸用戶),并在Scope
中設置,在離開攔截器時關閉Scope
。
AuthScope.java
// 獲取登錄態(tài)的工具類 public class AuthScope { private static final ScopeKey<String> LOGIN_USER = new ScopeKey<>(); public static String getLoginUser() { return LOGIN_USER.get(); } public static void setLoginUser(String loginUser) { if (loginUser == null) { loginUser = "unknownUser"; } LOGIN_USER.set(loginUser); } }
ScopeFilter.java
@Lazy @Order(0) @Service("scopeFilter") public class ScopeFilter extends OncePerRequestFilter { @Override protected String getAlreadyFilteredAttributeName() { return this.getClass().getName(); } @Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { // 開啟Scope beginScope(); try { Cookie[] cookies = request.getCookies(); String loginUser = "unknownUser"; if (cookies != null) { for (Cookie cookie : cookies) { if (cookie.getName().equals("login_user")) { loginUser = cookie.getValue(); break; } } } // 設置該 Request 上下文對用的登陸用戶 AuthScope.setLoginUser(loginUser); filterChain.doFilter(request, response); } finally { // 關閉Scope endScope(); } } }
注入Filter
@Slf4j @Configuration public class FilterConfig { @Bean public FilterRegistrationBean<ScopeFilter> scopeFilterRegistration() { FilterRegistrationBean<ScopeFilter> registration = new FilterRegistrationBean<>(); registration.setFilter(new ScopeFilter()); registration.addUrlPatterns("/rest/*"); registration.setOrder(0); log.info("scope filter registered"); return registration; } }
UserController.java
@Slf4j @RestController @RequestMapping("/rest") public class UserController { // curl --location --request GET 'localhost:8080/rest/getLoginUser' --header 'Cookie: login_user=zhangsan' @GetMapping("/getLoginUser") public String getLoginUser() { return AuthScope.getLoginUser(); } // curl --location --request GET 'localhost:8080/rest/getLoginUserInThreadPool' --header 'Cookie: login_user=zhangsan' @GetMapping("/getLoginUserInThreadPool") public String getLoginUserInThreadPool() { ScopeThreadPoolExecutor executor = ScopeThreadPoolExecutor.newFixedThreadPool(4); executor.execute(() -> { log.info("get login user in thread pool: {}", AuthScope.getLoginUser()); }); return AuthScope.getLoginUser(); } }
通過以下請求驗證,請求線程和線程池線程是否能獲取登錄態(tài),其中登錄態(tài)通過Cookie模擬:
curl --location --request GET 'localhost:8080/rest/getLoginUser' --header 'Cookie: login_user=zhangsan' curl --location --request GET 'localhost:8080/rest/getLoginUserInThreadPool' --header 'Cookie: login_user=zhangsan'
5 總結
源代碼
github:https://github.com/pengchengSU/demo-request-scope
以上就是利用ThreadLocal實現一個上下文管理組件的詳細內容,更多關于ThreadLocal上下文管理組件的資料請關注腳本之家其它相關文章!
相關文章
JAVA匿名內部類(Anonymous Classes)的具體使用
本文主要介紹了JAVA匿名內部類,匿名內部類在我們JAVA程序員的日常工作中經常要用到,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08關于JSONObject.toJSONString出現地址引用問題
這篇文章主要介紹了關于JSONObject.toJSONString出現地址引用問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03詳解JavaEE 使用 Redis 數據庫進行內容緩存和高訪問負載
本篇文章主要介紹了JavaEE 使用 Redis 數據庫進行內容緩存和高訪問負載,具有一定的參考價值,有興趣的可以了解一下2017-08-08