Java利用多線程和分塊實現(xiàn)快速讀取文件
背景
在工作中經(jīng)常會有接收文件并且讀取落庫的需求,讀取方式都是串行讀取,即一行行的讀取,如果文件小還可以,但是如果文件比較大,類似于全量文件的話,這樣的讀取就會非常效率低。
本文主要介紹的是如何正確的將文件分塊,多線程的實現(xiàn)方式有多種,這里用的是CompletableFuture
方法
因為我們文件里面的每條數(shù)據(jù)之間沒有任何依賴關(guān)系也不存在順序要求。如何提高讀取速度,第一個想到當然就是并行讀取文件,并行讀取的前提就是要給文件分塊,讓每個線程只讀取對應分塊的數(shù)據(jù),先看看我們的文件格式
可以看見我們的文件格式每一行的長度不一,同時文件也無法像TCP通過指定數(shù)據(jù)體的長度來讀取數(shù)據(jù),所以如何能正確的分塊是整個方法的關(guān)鍵,如果分多或者分少了就會導致數(shù)據(jù)讀取錯誤的可能。
可以看見錯誤的分塊就會導致我們讀取的數(shù)據(jù)會被截取掉一部分,截取掉多少都是隨機的。這里我們用的方法是用填充來讓每個分塊都是正確的。具體來說就是我們在分塊的時候,判斷一下當前的分塊位置會不會導致數(shù)據(jù)被截取,因為我們的數(shù)據(jù)是一行行的,所以最好的分塊位置都是分在了行尾。如果說當前的分塊位置是在一行的中間的話,那我們就要移動這個分塊的位置到這行的行尾去
private static int THREAD_NUM = 5; long total = file.length(); long chunkSize = total < THREAD_NUM ? total : total / THREAD_NUM;
先確定要用幾個線程并行讀取,然后根據(jù)線程數(shù)和文件的大小來確定每一塊的大小,接下來就進行判斷是否需要填充
private static long padding(long start, long chunkSize, File file) { try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")){ randomAccessFile.seek(start + chunkSize); boolean eol = false; //判斷當前的位置是否需要填充,如果當前沒有數(shù)據(jù)或者是行尾,則不需要填充 switch (randomAccessFile.read()) { case -1: case '\n': eol = true; break; case '\r': eol = true; break; default: break; } //如果符合填充條件,對其進行填充,首先是讀取一行數(shù)據(jù),然后計算出這行數(shù)據(jù)的長度,然后將這行數(shù)據(jù)的長度加上前面讀取了一字節(jié),然后將這些長度加到chunkSize上 if (!eol){ String readLine = randomAccessFile.readLine(); chunkSize += readLine.getBytes().length; //加上前面讀取的一字節(jié) chunkSize += 1; } } catch (Exception e) { throw new RuntimeException(e); } return chunkSize; }
如何填充的可以看看代碼注釋,是參照了readline的實現(xiàn)思路。 經(jīng)過填充后就可以確保每塊的分塊的最后一個位置都是在行尾。將每個分塊的起始位置和分塊大小儲存起來再結(jié)合上CompletableFuture就可以多線程分塊讀取文件了
Map<Long, Long> chunkMap = new HashMap<>(); for (int i = 0; i < THREAD_NUM; i++) { chunkSize = padding(start, chunkSize, file); chunkMap.put(start, chunkSize); start += chunkSize; } CompletableFuture.allOf(chunkMap.entrySet().stream().map(entry -> CompletableFuture.runAsync( () -> handlerReportTreeBaseData(entry.getKey(), entry.getValue()))).toArray(CompletableFuture[]::new)) .exceptionally(throwable -> { System.out.println(throwable.getMessage()); return null; }).join();
完整實現(xiàn)
接下來給出整個的實現(xiàn)代碼,歡迎大家看看有沒有什么我沒有考慮到的,有可能的隱藏BUG和還能優(yōu)化改善的地方,歡迎討論
public class SpiltFIle { private static int THREAD_NUM = 5; private static void splitChunks() { File file = new File("test.txt"); long total = file.length(); long chunkSize = total < THREAD_NUM ? total : total / THREAD_NUM; long start = 0; Map<Long, Long> chunkMap = new HashMap<>(); for (int i = 0; i < THREAD_NUM; i++) { chunkSize = padding(start, chunkSize, file); handlerReportTreeBaseData(start, chunkSize); chunkMap.put(start, chunkSize); start += chunkSize; } CompletableFuture.allOf(chunkMap.entrySet().stream().map(entry -> CompletableFuture.runAsync( () -> handlerReportTreeBaseData(entry.getKey(), entry.getValue()))).toArray(CompletableFuture[]::new)) .exceptionally(throwable -> { System.out.println(throwable.getMessage()); return null; }).join(); } private static long padding(long start, long chunkSize, File file) { try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")){ randomAccessFile.seek(start + chunkSize); boolean eol = false; //判斷當前的位置是否需要填充,如果當前沒有數(shù)據(jù)或者是行尾,則不需要填充 switch (randomAccessFile.read()) { case -1: case '\n': eol = true; break; case '\r': eol = true; break; default: break; } //如果符合填充條件,對其進行填充,首先是讀取一行數(shù)據(jù),然后計算出這行數(shù)據(jù)的長度,然后將這行數(shù)據(jù)的長度加上前面讀取了一字節(jié),然后將這些長度加到chunkSize上 if (!eol){ String readLine = randomAccessFile.readLine(); chunkSize += readLine.getBytes().length; chunkSize += 1; //加上前面讀取的一字節(jié) } } catch (Exception e) { throw new RuntimeException(e); } return chunkSize; } private static void handlerReportTreeBaseData(long start, long chunkSize) { try (RandomAccessFile randomAccessFile = new RandomAccessFile("test.txt", "r")) { randomAccessFile.seek(start); long currentCount = 0L; String line; while (currentCount < chunkSize && (line = randomAccessFile.readLine()) != null){ if (!line.isEmpty()){ currentCount += line.getBytes().length + System.lineSeparator().getBytes().length; System.out.println(line); } } }catch (Exception ignored){ } } public static void main(String[] args) throws IOException { SpiltFIle.splitChunks(); } }
最后也是能正常的讀取完文件
以上就是Java利用多線程和分塊實現(xiàn)快速讀取文件的詳細內(nèi)容,更多關(guān)于Java讀取文件的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringMVC解析JSON請求數(shù)據(jù)問題解析
這篇文章主要介紹了SpringMVC解析JSON請求數(shù)據(jù)問題解析,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-04-04java 啟動exe程序,傳遞參數(shù)和獲取參數(shù)操作
這篇文章主要介紹了java 啟動exe程序,傳遞參數(shù)和獲取參數(shù)操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01Spring的RedisTemplate存儲的key和value有特殊字符的處理
這篇文章主要介紹了Spring的RedisTemplate存儲的key和value有特殊字符的處理方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12關(guān)于RowBounds分頁原理、RowBounds的坑記錄
這篇文章主要介紹了關(guān)于RowBounds分頁原理、RowBounds的坑記錄,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-04-04