Java線程池流程編排運用實戰(zhàn)源碼
引導語
在線程池的面試中,面試官除了喜歡問 ThreadPoolExecutor 的底層源碼外,還喜歡問你有沒有在實際的工作中用過 ThreadPoolExecutor,我們在并發(fā)集合類的《場景集合:并發(fā) List、Map 的應用場景》一文中說過一種簡單的流程引擎,如果沒有看過的同學,可以返回去看一下。
本章就在流程引擎的基礎(chǔ)上運用 ThreadPoolExecutor,使用線程池實現(xiàn) SpringBean 的異步執(zhí)行。
1、流程引擎關(guān)鍵代碼回顧
《場景集合:并發(fā) List、Map 的應用場景》文中流程引擎執(zhí)行 SpringBean 的核心代碼為:
// 批量執(zhí)行 Spring Bean private void stageInvoke(String flowName, StageEnum stage, FlowContent content) { List<DomainAbilityBean> domainAbilitys = FlowCenter.flowMap.getOrDefault(flowName, Maps.newHashMap()).get(stage); if (CollectionUtils.isEmpty(domainAbilitys)) { throw new RuntimeException("找不到該流程對應的領(lǐng)域行為" + flowName); } for (DomainAbilityBean domainAbility : domainAbilitys) { // 執(zhí)行 Spring Bean domainAbility.invoke(content); } }
入?yún)⑹?flowName(流程名稱)、stage(階段)、content(上下文),其中 stage 中會執(zhí)行很多 SpringBean,SpringBean 被執(zhí)行的代碼是 domainAbility.invoke(content)。
2、異步執(zhí)行 SpringBean
從上述代碼中,我們可以看到所有的 SpringBean 都是串行執(zhí)行的,效率較低,我們在實際業(yè)務(wù)中發(fā)現(xiàn),有的 SpringBean 完全可以異步執(zhí)行,這樣既能完成業(yè)務(wù)請求,又能減少業(yè)務(wù)處理的 rt,對于這個需求,我們條件反射的有了兩個想法:
需要新開線程來異步執(zhí)行 SpringBean,可以使用 Runable 或者 Callable;業(yè)務(wù)請求量很大,我們不能每次來一個請求,就開一個線程,我們應該讓線程池來管理異步執(zhí)行的線程。
于是我們決定使用線程池來完成這個需求。
3、如何區(qū)分異步的 SpringBean
我們的 SpringBean 都是實現(xiàn) DomainAbilityBean 這個接口的,接口定義如下:
public interface DomainAbilityBean { /** * 領(lǐng)域行為的方法入口 */ FlowContent invoke(FlowContent content); }
從接口定義上來看,沒有預留任何地方來標識該 SpringBean 應該是同步執(zhí)行還是異步執(zhí)行,這時候我們可以采取注解的方式,我們新建一個注解,只要 SpringBean 上有該注解,表示該 SpringBean 應該異步執(zhí)行,否則應該同步執(zhí)行,新建的注解如下:
/** * 異步 SpringBean 執(zhí)行注解 * SpringBean 需要異步執(zhí)行的話,就打上該注解 */ @Target(ElementType.TYPE)// 表示該注解應該打在類上 @Retention(RetentionPolicy.RUNTIME) @Documented public @interface AsyncComponent { }
接著我們新建了兩個 SpringBean,并在其中一個 SpringBean 上打上異步的注解,并且打印出執(zhí)行 SpringBean 的線程名稱,如下圖:
圖中實現(xiàn)了兩個 SpringBean:BeanOne 和 BeanTwo,其中 BeanTwo 被打上了 AsyncComponent 注解,表明 BeanTwo 應該被異步執(zhí)行,兩個 SpringBean 都打印出執(zhí)行的線程的名稱。
4、mock 流程引擎數(shù)據(jù)中心
《場景集合:并發(fā) List、Map 的應用場景》一文中,我們說可以從數(shù)據(jù)庫中加載出流程引擎需要的數(shù)據(jù),此時我們 mock 一下,mock 的代碼如下:
@Component public class FlowCenter { /** * flowMap 是共享變量,方便訪問 */ public static final Map<String, Map<StageEnum, List<DomainAbilityBean>>> flowMap = Maps.newConcurrentMap(); /** * PostConstruct 注解的意思就是 * 在容器啟動成功之后,初始化 flowMap */ @PostConstruct public void init() { // 初始化 flowMap mock Map<StageEnum, List<DomainAbilityBean>> stageMap = flowMap.getOrDefault("flow1",Maps.newConcurrentMap()); for (StageEnum value : StageEnum.values()) { List<DomainAbilityBean> domainAbilitys = stageMap.getOrDefault(value, Lists.newCopyOnWriteArrayList()); if(CollectionUtils.isEmpty(domainAbilitys)){ domainAbilitys.addAll(ImmutableList.of( ApplicationContextHelper.getBean(BeanOne.class), ApplicationContextHelper.getBean(BeanTwo.class) )); stageMap.put(value,domainAbilitys); } } flowMap.put("flow1",stageMap); // 打印出加載完成之后的數(shù)據(jù)結(jié)果 log.info("init success,flowMap is {}", JSON.toJSONString(flowMap)); } }
5、新建線程池
在以上操作完成之后,只剩下最后一步了,之前我們執(zhí)行 SpringBean 時,是這行代碼:domainAbility.invoke(content);
現(xiàn)在我們需要區(qū)分 SpringBean 是否是異步的,如果是異步的,丟到線程池中去執(zhí)行,如果是同步的,仍然使用原來的方法進行執(zhí)行,于是我們把這些邏輯封裝到一個工具類中,工具類如下:
public class ComponentExecutor { // 我們新建了一個線程池 private static ExecutorService executor = new ThreadPoolExecutor(15, 15, 365L, TimeUnit.DAYS, new LinkedBlockingQueue<>()); // 如果 SpringBean 上有 AsyncComponent 注解,表示該 SpringBean 需要異步執(zhí)行,就丟到線程池中去 public static final void run(DomainAbilityBean component, FlowContent content) { // 判斷類上是否有 AsyncComponent 注解 if (AnnotationUtils.isAnnotationPresent(AsyncComponent.class, AopUtils.getTargetClass(component))) { // 提交到線程池中 executor.submit(() -> { component.invoke(content); }); return; } // 同步 SpringBean 直接執(zhí)行。 component.invoke(content); } }
我們把原來的執(zhí)行代碼替換成使用組件執(zhí)行器執(zhí)行,如下圖:
6、測試
以上步驟完成之后,簡單的流程引擎就已經(jīng)完成了,我們簡單地在項目啟動的時候加上測試,代碼如下:
更嚴謹?shù)淖龇ǎ菚憜卧獪y試來測試流程引擎,為了快一點,我們直接在項目啟動類上加上了測試代碼。
運行之后的關(guān)鍵結(jié)果如下:
[main] demo.sixth.SynchronizedDemo: SynchronizedDemo init begin [main] demo.sixth.SynchronizedDemo: SynchronizedDemo init end [main] demo.three.flow.FlowCenter : init success,flowMap is {"flow1":{"PARAM_VALID":[{},{}],"AFTER_TRANSACTION":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}],"BUSINESS_VALID":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}],"IN_TRANSACTION":[{"$ref":"$.flow1.PARAM_VALID[0]"},{"$ref":"$.flow1.PARAM_VALID[1]"}]}} o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup [main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080 (http) [main] demo.DemoApplication : Started DemoApplication in 5.377 seconds (JVM running for 6.105) [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [pool-1-thread-1] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-1 [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [pool-1-thread-2] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-2 [pool-1-thread-3] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-3 [main] demo.three.flow.BeanOne : BeanOne is run,thread name is main [pool-1-thread-4] demo.three.flow.BeanTwo : BeanTwo is run,thread name is pool-1-thread-4
從運行結(jié)果中,我們可以看到 BeanTwo 已經(jīng)被多個不同的線程異步執(zhí)行了。
7、總結(jié)
這是一個線程池在簡單流程引擎上的運用實站,雖然這個流程引擎看起來比較簡單,但在實際工作中,還是非常好用的,大家可以把代碼拉下來,自己嘗試一下,調(diào)試一下參數(shù),比如當我新增 SpringBean 的時候,流程引擎的表現(xiàn)如何。
以上就是Java線程池流程編排運用實戰(zhàn)源碼的詳細內(nèi)容,更多關(guān)于Java線程池流程編排運用的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
IDEA遇到Internal error. Please refer to http://jb. gg/ide/crit
這篇文章主要介紹了IDEA遇到Internal error. Please refer to http://jb. gg/ide/critical-startup-errors的問題及解決辦法,本文通過圖文并茂的形式給大家介紹的非常詳細,需要的朋友可以參考下2020-08-08SpringBoot?AOP統(tǒng)一處理Web請求日志的示例代碼
springboot有很多方法處理日志,例如攔截器,aop切面,service中代碼記錄等,下面這篇文章主要給大家介紹了關(guān)于SpringBoot?AOP統(tǒng)一處理Web請求日志的相關(guān)資料,需要的朋友可以參考下2023-02-02Java String轉(zhuǎn)換時為null的解決方法
這篇文章主要介紹了Java String轉(zhuǎn)換時為null的解決方法,需要的朋友可以參考下2017-07-07