異步線程traceId如何實現(xiàn)傳遞
前言
在日常問題排查中,我們經(jīng)常在ELK中根據(jù)traceId來查詢請求的日志鏈路,在同步請求中,根據(jù)traceId一站到底,很爽,那如果是異步請求該如何處理呢?
項目中的異步請求都是結(jié)合線程池來開啟異步線程,下面結(jié)合slf4j中的MDC和線程池來實現(xiàn)異步線程的traceId傳遞。
重寫ThreadPoolTaskExecutor中方法
下面的工具類,分別在Callable和Runnable異步任務(wù)執(zhí)行前通過MDC.setContextMap(context)設(shè)置請求映射上下文
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* @desc: 定義MDC工具類,支持Runnable和Callable兩種,目的就是為了把父線程的traceId設(shè)置給子線程
*/
public class MdcUtil {
public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
return () -> {
if (CollectionUtils.isEmpty(context)) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
return callable.call();
} finally {
// 清除子線程的,避免內(nèi)存溢出,就和ThreadLocal.remove()一個原因
MDC.clear();
}
};
}
public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return () -> {
if (CollectionUtils.isEmpty(context)) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
MDC.clear();
}
};
}
}下面定義一個ThreadPoolMdcExecutor 類來繼承ThreadPoolTaskExecutor 類,重寫execute和submit方法
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
/**
* @desc: 把當前的traceId透傳到子線程特意加的實現(xiàn)。
* 重點就是 MDC.getCopyOfContextMap(),此方法獲取當前線程(父線程)的traceId
*/
public class ThreadPoolMdcExecutor extends ThreadPoolTaskExecutor {
@Override
public void execute(Runnable task) {
super.execute(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(MdcUtil.wrap(task, MDC.getCopyOfContextMap()));
}
}下面定義線程池,就可以使用ThreadPoolMdcExecutor
@Bean(name = "callBackExecutorConfig")
public Executor callBackExecutorConfig() {
ThreadPoolTaskExecutor executor = new ThreadPoolMdcExecutor();
// 配置核心線程數(shù)
executor.setCorePoolSize(10);
// 配置最大線程數(shù)
executor.setMaxPoolSize(20);
// 配置隊列大小
executor.setQueueCapacity(200);
// 配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("async-Thread-");
// rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務(wù)
// abort:在調(diào)用executor執(zhí)行的方法中拋出異常 RejectedExecutionException
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 執(zhí)行初始化
executor.initialize();
return executor;
}定義好線程池之后,我們就可以使用callBackExecutorConfig線程池進行異步任務(wù),避免異步線程中的traceId丟失。
線程池增強
上面是通過繼承ThreadPoolTaskExecutor來,重寫execute和submit方法,設(shè)置MDC.setContextMap(context)設(shè)置上下文,我們也可以通過實現(xiàn)TaskDecorator 接口來增強線程池
public class ContextTransferTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
Map<String, String> context = MDC.getCopyOfContextMap();
RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
return () -> {
try {
MDC.setContextMap(context);
RequestContextHolder.setRequestAttributes(requestAttributes);
runnable.run();
} finally {
MDC.clear();
RequestContextHolder.resetRequestAttributes();
}
};
}
}接下來,定義線程池,對線程池進行增強
@Bean(name = "callBackExecutorConfig")
public Executor callBackExecutorConfig() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
// 配置核心線程數(shù)
executor.setCorePoolSize(10);
// 配置最大線程數(shù)
executor.setMaxPoolSize(20);
// 配置隊列大小
executor.setQueueCapacity(200);
// 配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("async-Thread-");
// rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務(wù)
// abort:在調(diào)用executor執(zhí)行的方法中拋出異常 RejectedExecutionException
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//線程池增強
threadPoolTaskExecutor.setTaskDecorator(new ContextTransferTaskDecorator());
// 執(zhí)行初始化
executor.initialize();
return executor;
}總結(jié)
上面兩種方式其實本質(zhì)都是通過Mdc來進行異步線程間的traceId同步,可以看下Mdc的源碼,最終還是通過InheritableThreadLocal來實現(xiàn)子線程獲取父線程信息
public class BasicMDCAdapter implements MDCAdapter {
private InheritableThreadLocal<Map<String, String>> inheritableThreadLocal =
new InheritableThreadLocal<Map<String, String>>() {
protected Map<String, String> childValue(Map<String, String> parentValue) {
return parentValue == null ? null : new HashMap(parentValue);
}
};
//省略若干
......
}以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
java:程序包org.apache.ibatis.annotations不存在報錯解決
這篇文章主要給大家介紹了關(guān)于java:程序包org.apache.ibatis.annotations不存在報錯的解決方法,這個錯誤是我在直接導入springboot項目的時候報錯的,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2023-04-04
Java靜態(tài)static與實例instance方法示例
這篇文章主要為大家介紹了Java靜態(tài)static與實例instance方法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-08-08
jackson json序列化實現(xiàn)首字母大寫,第二個字母需小寫
這篇文章主要介紹了jackson json序列化實現(xiàn)首字母大寫,第二個字母需小寫方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06

