Java?多線程并發(fā)編程提高數(shù)據(jù)處理效率的詳細過程
??工作場景中遇到這樣一個需求:根據(jù)主機的 IP 地址聯(lián)動更新其他模型的相關信息。需求很簡單,只涉及一般的數(shù)據(jù)庫聯(lián)動查詢以及更新操作,然而在編碼實現(xiàn)過程中發(fā)現(xiàn),由于主機的數(shù)量很多,導致循環(huán)遍歷查詢、更新時花費很長的時間,調(diào)用一次接口大概需要 30-40 min 時間才能完成操作。
??因此,為了有效縮短接口方法的執(zhí)行時間,便考慮使用多線程并發(fā)編程方法,利用多核處理器并行執(zhí)行的能力,通過異步處理數(shù)據(jù)的方式,便可以大大縮短執(zhí)行時間,提高執(zhí)行效率。
??這里使用可重用固定線程數(shù)的線程池 FixedThreadPool
,并使用 CountDownLatch
并發(fā)工具類提供的并發(fā)流程控制工具作為配合使用,保證多線程并發(fā)編程過程中的正常運行:
- 首先,通過
Runtime.getRuntime().availableProcessors()
方法獲取運行機器的 CPU 線程數(shù),用于后續(xù)設置固定線程池的線程數(shù)量。 - 其次,判斷任務的特性,如果為計算密集型任務則設置線程數(shù)為
CPU 線程數(shù)+1
,如果為 IO 密集型任務則設置線程數(shù)為2 * CPU 線程數(shù)
,由于在方法中需要與數(shù)據(jù)庫進行頻繁的交互,因此屬于 IO 密集型任務。 - 之后,對數(shù)據(jù)進行分組切割,每個線程處理一個分組的數(shù)據(jù),分組的組數(shù)與線程數(shù)保持一致,并且還要創(chuàng)建計數(shù)器對象
CountDownLatch
,調(diào)用構(gòu)造函數(shù),初始化參數(shù)值為線程數(shù)個數(shù),保證主線程等待所有子線程運行結(jié)束后,再進行后續(xù)的操作。 - 然后,調(diào)用
executorService.execute()
方法,重寫run
方法編寫業(yè)務邏輯與數(shù)據(jù)處理代碼,執(zhí)行完當前線程后記得將計數(shù)器減1操作。 - 最后,當所有子線程執(zhí)行完成后,關閉線程池。
?在省略工作場景中的業(yè)務邏輯代碼后,通用的處理方法示例如下所示:
public ResponseData updateHostDept() { // ... List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host"); // split the hostMapList for the following multi-threads task // return the number of logical CPUs int processorsNum = Runtime.getRuntime().availableProcessors(); // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks, // if Computing Tasks set the threadNum as (the number of logical CPUs) + 1 int threadNum = processorsNum * 2; // the number of each group data int eachGroupNum = hostMapList.size() / threadNum; List<List<Map>> groupList = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { int start = i * eachGroupNum; if (i == threadNum - 1) { int end = mapList.size(); groupList.add(hostMapList.subList(start, end)); } else { int end = (i+1) * eachGroupNum; groupList.add(hostMapList.subList(start, end)); } } // update data by using multi-threads asynchronously ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (List<Map> group : groupList) { executorService.execute(()->{ try { for (Map map : group) { // update the data in mongodb } } catch (Exception e) { e.printStackTrace(); } finally { // let counter minus one countDownLatch.countDown(); } }); } try { // main thread donnot execute until all child threads finish countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } // remember to shutdown the threadPool executorService.shutdown(); return ResponseData.success(); }
??那么在使用多線程異步更新的策略后,從當初調(diào)用接口所需的大概時間為 30-40 min
下降到了 8-10 min
,大大提高了執(zhí)行效率。
??需要注意的是,這里使用的
newFixedThreadPool
創(chuàng)建線程池,它有一個缺陷就是,它的阻塞隊列默認是一個無界隊列,默認值為Integer.MAX_VALUE
極有可能會造成 OOM 問題。因此,一般可以使用ThreadPoolExecutor
來創(chuàng)建線程池,自己可以指定等待隊列中的線程個數(shù),避免產(chǎn)生 OOM 問題。
public ResponseData updateHostDept() { // ... List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host"); // split the hostMapList for the following multi-threads task // return the number of logical CPUs int processorsNum = Runtime.getRuntime().availableProcessors(); // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks, // if Computing Tasks set the threadNum as (the number of logical CPUs) + 1 int threadNum = processorsNum * 2; // the number of each group data int eachGroupNum = hostMapList.size() / threadNum; List<List<Map>> groupList = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { int start = i * eachGroupNum; if (i == threadNum - 1) { int end = mapList.size(); groupList.add(hostMapList.subList(start, end)); } else { int end = (i+1) * eachGroupNum; groupList.add(hostMapList.subList(start, end)); } } // update data by using multi-threads asynchronously ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (List<Map> group : groupList) { executor.execute(()->{ try { for (Map map : group) { // update the data in mongodb } } catch (Exception e) { e.printStackTrace(); } finally { // let counter minus one countDownLatch.countDown(); } }); } try { // main thread donnot execute until all child threads finish countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } // remember to shutdown the threadPool executor.shutdown(); return ResponseData.success(); }
在上述的代碼中,核心線程數(shù)和最大線程數(shù)分別為 5 和 8,并沒有設置的很大的值,因為如果如果設置的很大,線程間頻繁的上下文切換也會增加時間消耗,反而不能最大程度上發(fā)揮多線程的優(yōu)勢。至于如何選擇合適的參數(shù),需要根據(jù)機器的參數(shù)以及任務的類型綜合考慮決定。
??最后補充一點,如果想要通過非編碼的方式獲取機器的 CPU 線程個數(shù)也很簡單,windows 系統(tǒng)通過任務管理器,選擇 “性能”,便可以查看 CPU 線程個數(shù)的情況,如下圖所示:
??從上圖可以看到,我的機器中內(nèi)核是八個 CPU,但是通過超線程技術一個物理的 CPU 核心可以模擬成兩個邏輯 CPU 線程,因此我的機器是支持8核16線程的。
到此這篇關于Java 多線程并發(fā)編程提高數(shù)據(jù)處理效率的文章就介紹到這了,更多相關Java 多線程提高數(shù)據(jù)處理效率內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
JAVA8 List<List<Integer>> list中再裝一個list轉(zhuǎn)成一個list操
這篇文章主要介紹了JAVA8 List<List<Integer>> list中再裝一個list轉(zhuǎn)成一個list操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08maven 在執(zhí)行package,install,deploy時使用clean與不使用clean的不同之處
有時候用mvn install后,新改的內(nèi)容不生效,一定要后來使用mvn clean install 才生效,由于之前沒有做記錄,以及記不清是什么情況下才會出現(xiàn)的問題,于是想看看clean和不clean的區(qū)別,感興趣的朋友跟隨小編一起看看吧2021-08-08spring?boot?validation參數(shù)校驗與分組嵌套各種類型及使用小結(jié)
參數(shù)校驗基本上是controller必做的事情,畢竟前端傳過來的一切都不可信,validation可以簡化這一操作,這篇文章主要介紹了spring?boot?validation參數(shù)校驗分組嵌套各種類型及使用小結(jié),需要的朋友可以參考下2023-09-09Java數(shù)據(jù)結(jié)構(gòu)學習之樹
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)學習之樹,文中有非常詳細的代碼示例,對正在學習java數(shù)據(jù)結(jié)構(gòu)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-05-05