Java中parallelStream().forEach()的踩坑日記
前言
最近大聰明一直在開發(fā)項目中的新需求,其中有一個需求是“解析文件(.txt文件,一行就是一條數(shù)據(jù))中的數(shù)據(jù)并進行入庫操作”。其實這個需求也很簡單,無非就是將文件中每一行數(shù)據(jù)轉(zhuǎn)換為一個對象,將每一個對象都存儲到 list 集合中,最終執(zhí)行批量入庫的操作。但就是這么一個簡單的需求卻讓我踩了一個大坑....
踩坑日記
各位小伙伴先看一下上圖中的代碼,不知道各位小伙伴有沒有看出什么問題呢??? 可能這么看起來有些不好理解,咱們再簡化一下圖中的代碼,如下所示:??
public static void main(String[] args) { List<Integer> list = new ArrayList<>(); for (int i = 0; i < 10000; i++) { list.add(i); } System.out.println("a:"+list.size()); List<Integer> streamList = new ArrayList<>(); list.parallelStream().forEach(streamList::add); System.out.println("b:"+streamList.size()); } }
各位小伙伴看看簡化后的代碼,猜測一下 a 和 b 輸出的值分別是多少呢?這里就不賣關子了,咱們直接揭曉答案 ??
結(jié)果可能和大多數(shù)小伙伴猜測的都不太一樣,a 和 b 的值居然不相等,且 b 的值 永遠都會小于 a,同時在多次執(zhí)行之后可能會出現(xiàn)數(shù)組下標越界異常,顯然這里的代碼是不符合邏輯的??
這也是我在項目中遇倒的問題所在,解析完文件后,通過 parallelStream().forEach() 遍歷結(jié)果進行處理,但是最終入庫的數(shù)據(jù)條數(shù)總是小于文件中的數(shù)據(jù)條數(shù)。
這種情況大聰明還是第一次見到,不過卻又一次激起了大聰明的求知欲,開啟了刨根問底模式~
刨根問底
經(jīng)過大聰明的一番探索,也是終于找到了問題答案... ??
Stream(流)是 JDK8 中引入的一種類似與迭代器(Iterator)的單向迭代訪問數(shù)據(jù)的工具。ParallelStream 則是并行的流,它通過 Fork/Join 框架來拆分任務,加速流的處理過程。Fork/Join 的框架是通過把一個大任務不斷 fork 成許多子任務,然后多線程執(zhí)行這些子任務,最后再 Join 這些子任務得到最終結(jié)果。咱們回到實例代碼中來解釋一下,就是先將 list 集合 fork 成多段,然后多線程添加到 streamList 的結(jié)合中,而 streamList 是ArrayList 類型,ArrayList 的 add() 方法并不能保證原子性。
咱們先看一下 ArrayList 中 add() 方法的源碼??
眾所周知,ArrayList 作為 Collection 中極重要的一員,是非線程安全的,所以 ArrayList 并不適合多線程高并發(fā)的情況,在多線程高并發(fā)時會出現(xiàn)內(nèi)部某些位置為 null 的情況。核心原因是,ArrayList 的add() 的方法不是線程安全的,是非原子性的,add操作可以簡單理解為兩個步驟:
- ensureCapacityInternal(size + 1) :確認當前 ArrayList 中的數(shù)組是否還可以加入新的元素。如果不行,就會再申請一個:int newCapacity = oldCapacity + (oldCapacity >> 1) 大小的數(shù)組(即容量變?yōu)樵瓉淼?1.5 倍),然后將數(shù)據(jù)復制過去。
- elementData[size++] = e:將元素添加到 elementData 數(shù)組中。
那么在多線程高并發(fā)情況下,如果有A、B兩個線程同時執(zhí)行 add() 方法,在第一步校驗數(shù)組容量時,A、B線程都發(fā)現(xiàn)當前無需擴容,還可以繼續(xù)添加一個元素;因此A、B線程都進入了第二步,此時,A線程先執(zhí)行完,數(shù)組容量已滿,然后B線程再對 elementData 賦值時,就會出現(xiàn)我們上面說到的情況,要么是數(shù)據(jù)丟失,要么是拋出數(shù)組下標越界的異常。
解決方案
問題原因我們已經(jīng)找到了,那么問題的解決方案也就呼之欲出了~ ??
- ?? 方案一:將 parallelStream 改成 stream,或者直接使用 foreach 遍歷處理。也就是放棄多線程的寫法,改為傳統(tǒng)的單線程處理。
- ?? 方案二:使用 list = new CopyOnWriteArrayList<>(); 這是個線程安全的類。從源碼上看,CopyOnWriteArrayList 在 add 操作時,通過 ReentrantLock 進行加鎖,防止并發(fā)寫。但是每次 add 操作都是把原數(shù)組中的元素拷貝一份到新數(shù)組中,然后在新數(shù)組中添加新元素,最后再把引用指向新數(shù)組,這也就會頻繁的創(chuàng)建數(shù)組(千萬別忘了數(shù)組需要一塊連續(xù)的內(nèi)存空間)。所以當實際業(yè)務邏輯中存在大量 add 操作時,要謹慎使用 CopyOnWriteArrayList 。
- ?? 方案三:使用包裝類 list = Collections.synchronizedList(Arrays.asList());
我們在使用 parallelStream 之前,一定要仔細思考一下自己的業(yè)務邏輯是否真的需要多線程并發(fā)處理。其實在實際應用場景中,并不是所有的問題都適合使用并發(fā)來解決,比如當數(shù)據(jù)量不大時,順序執(zhí)行往往比并行執(zhí)行更快,畢竟準備線程池和其它相關資源也是需要時間的。但是,當任務涉及到 I/O 操作并且任務之間不互相依賴時,那么并行化就是一個不錯的選擇。
小結(jié)
到此這篇關于Java中parallelStream().forEach()的踩坑日記的文章就介紹到這了,更多相關Java parallelStream().forEach()內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
使用Netty實現(xiàn)類似Dubbo的遠程接口調(diào)用的實現(xiàn)方法
本文介紹了如何使用Netty框架實現(xiàn)類似Dubbo的遠程接口調(diào)用,通過自定義編解碼器、通信協(xié)議和服務注冊中心等實現(xiàn)遠程通信和服務治理。文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧2023-04-04Java?Dubbo服務調(diào)用擴展點Filter使用教程
Dubbo是阿里巴巴公司開源的一個高性能優(yōu)秀的服務框架,使得應用可通過高性能的RPC實現(xiàn)服務的輸出和輸入功能,可以和Spring框架無縫集成2022-12-12解決SpringCloud Config結(jié)合github無法讀取配置的問題
這篇文章主要介紹了解決SpringCloud Config結(jié)合github無法讀取配置的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02