springboot使用ThreadPoolTaskExecutor多線程批量插入百萬級(jí)數(shù)據(jù)的實(shí)現(xiàn)方法
前言
開發(fā)目的:提高百萬級(jí)數(shù)據(jù)插入效率。
采取方案:利用ThreadPoolTaskExecutor多線程批量插入。
采用技術(shù):springboot2.1.1+mybatisPlus3.0.6+swagger2.5.0+Lombok1.18.4+postgresql+ThreadPoolTaskExecutor等。
具體實(shí)現(xiàn)細(xì)節(jié)
application-dev.properties添加線程池配置信息
# 異步線程配置 # 配置核心線程數(shù) async.executor.thread.core_pool_size = 30 # 配置最大線程數(shù) async.executor.thread.max_pool_size = 30 # 配置隊(duì)列大小 async.executor.thread.queue_capacity = 99988 # 配置線程池中的線程的名稱前綴 async.executor.thread.name.prefix = async-importDB-
spring容器注入線程池bean對(duì)象
@Configuration @EnableAsync @Slf4j public class ExecutorConfig { @Value("${async.executor.thread.core_pool_size}") private int corePoolSize; @Value("${async.executor.thread.max_pool_size}") private int maxPoolSize; @Value("${async.executor.thread.queue_capacity}") private int queueCapacity; @Value("${async.executor.thread.name.prefix}") private String namePrefix; @Bean(name = "asyncServiceExecutor") public Executor asyncServiceExecutor() { log.warn("start asyncServiceExecutor"); //在這里修改 ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心線程數(shù) executor.setCorePoolSize(corePoolSize); //配置最大線程數(shù) executor.setMaxPoolSize(maxPoolSize); //配置隊(duì)列大小 executor.setQueueCapacity(queueCapacity); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix(namePrefix); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執(zhí)行初始化 executor.initialize(); return executor; } }
創(chuàng)建異步線程 業(yè)務(wù)類
@Service @Slf4j public class AsyncServiceImpl implements AsyncService { @Override @Async("asyncServiceExecutor") public void executeAsync(List<LogOutputResult> logOutputResults, LogOutputResultMapper logOutputResultMapper, CountDownLatch countDownLatch) { try{ log.warn("start executeAsync"); //異步線程要做的事情 logOutputResultMapper.addLogOutputResultBatch(logOutputResults); log.warn("end executeAsync"); }finally { countDownLatch.countDown();// 很關(guān)鍵, 無論上面程序是否異常必須執(zhí)行countDown,否則await無法釋放 } } }
創(chuàng)建多線程批量插入具體業(yè)務(wù)方法
@Override public int testMultiThread() { List<LogOutputResult> logOutputResults = getTestData(); //測(cè)試每100條數(shù)據(jù)插入開一個(gè)線程 List<List<LogOutputResult>> lists = ConvertHandler.splitList(logOutputResults, 100); CountDownLatch countDownLatch = new CountDownLatch(lists.size()); for (List<LogOutputResult> listSub:lists) { asyncService.executeAsync(listSub, logOutputResultMapper,countDownLatch); } try { countDownLatch.await(); //保證之前的所有的線程都執(zhí)行完成,才會(huì)走下面的; // 這樣就可以在下面拿到所有線程執(zhí)行完的集合結(jié)果 } catch (Exception e) { log.error("阻塞異常:"+e.getMessage()); } return logOutputResults.size(); }
模擬2000003 條數(shù)據(jù)進(jìn)行測(cè)試
多線程 測(cè)試 2000003 耗時(shí)如下:耗時(shí)1.67分鐘
本次開啟30個(gè)線程,截圖如下:
單線程測(cè)試2000003 耗時(shí)如下:耗時(shí)5.75分鐘
檢查多線程入庫的數(shù)據(jù),檢查是否存在重復(fù)入庫的問題:
根據(jù)id分組,查看是否有id重復(fù)的數(shù)據(jù),通過sql語句檢查,沒有發(fā)現(xiàn)重復(fù)入庫的問題
檢查數(shù)據(jù)完整性: 通過sql語句查詢,多線程錄入數(shù)據(jù)完整
測(cè)試結(jié)果
不同線程數(shù)測(cè)試:
總結(jié)
通過以上測(cè)試案列,同樣是導(dǎo)入2000003 條數(shù)據(jù),多線程耗時(shí)1.67分鐘,單線程耗時(shí)5.75分鐘。通過對(duì)不同線程數(shù)的測(cè)試,發(fā)現(xiàn)不是線程數(shù)越多越好,具體多少合適,網(wǎng)上有一個(gè)不成文的算法:
CPU核心數(shù)量*2 +2 個(gè)線程。
附:測(cè)試電腦配置
到此這篇關(guān)于springboot使用ThreadPoolTaskExecutor多線程批量插入百萬級(jí)數(shù)據(jù)的實(shí)現(xiàn)方法的文章就介紹到這了,更多相關(guān)springboot ThreadPoolTaskExecutor多線程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring?Boot?使用觀察者模式實(shí)現(xiàn)實(shí)時(shí)庫存管理的步驟
在現(xiàn)代軟件開發(fā)中,實(shí)時(shí)數(shù)據(jù)處理非常關(guān)鍵,本文提供了一個(gè)使用SpringBoot和觀察者模式開發(fā)實(shí)時(shí)庫存管理系統(tǒng)的詳細(xì)教程,步驟包括創(chuàng)建項(xiàng)目、定義實(shí)體類、實(shí)現(xiàn)觀察者模式、集成Spring框架、創(chuàng)建RESTful?API端點(diǎn)和測(cè)試應(yīng)用等,這將有助于開發(fā)者構(gòu)建能夠即時(shí)響應(yīng)庫存變化的系統(tǒng)2024-09-09深入理解Java動(dòng)態(tài)代理與靜態(tài)代理
這篇文章主要介紹了深入理解Java動(dòng)態(tài)代理與靜態(tài)代理,靜態(tài)代理,代理類和被代理的類實(shí)現(xiàn)了同樣的接口,代理類同時(shí)持有被代理類的引用,動(dòng)態(tài)代理的根據(jù)實(shí)現(xiàn)方式的不同可以分為JDK動(dòng)態(tài)代理和CGlib動(dòng)態(tài)代理2022-06-06IDEA最新激活碼2021(IDEA2020.3.2最新永久激活方法)
這篇文章主要介紹了IDEA最新激活碼2021(IDEA2020.3.2最新永久激活方法),本文通過實(shí)例圖文相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12MyBatisPlus+Spring實(shí)現(xiàn)聲明式事務(wù)的方法實(shí)現(xiàn)
本文主要介紹了MyBatisPlus+Spring實(shí)現(xiàn)聲明式事務(wù)的方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-07-07實(shí)現(xiàn)一個(gè)規(guī)則引擎的可視化具體方案
項(xiàng)目原因需要用到規(guī)則引擎,但是發(fā)現(xiàn)大部分不可以自由的進(jìn)行規(guī)則定義,通過不斷嘗試變換關(guān)鍵字在搜索引擎搜索,最終在stackoverflow找到了一個(gè)探討這個(gè)問題的帖子,特此將帖子中提到的方案分享一下,如果你跟我一樣在研究同樣的問題,也許對(duì)你有用2021-04-04Eclipse插件開發(fā)實(shí)現(xiàn)控制臺(tái)輸出信息的方法
今天小編就為大家分享一篇關(guān)于Eclipse插件開發(fā)實(shí)現(xiàn)控制臺(tái)輸出信息的方法,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-01-01帶你了解如何使用Spring基于ProxyFactoryBean創(chuàng)建AOP代理
這篇文章主要介紹了Spring基于ProxyFactoryBean創(chuàng)建AOP代理,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2021-08-08