Node.js多文件Stream合并,串行和并發(fā)兩種模式的實(shí)現(xiàn)方式
將多個文件合并為一個文件,常見的場景是類似于大文件分片上傳,事先根據(jù)一定的文件大小拆分為多個小文件上傳到服務(wù)端,最后服務(wù)端在合并起來。
怎么合并?一種簡單的辦法是使用 fs.readFile 讀取,fs.writeFile 追加寫入,這種方式是將文件數(shù)據(jù)先讀入應(yīng)用內(nèi)存再寫入,不是很推薦,Node.js 本身提供了 Stream 模塊可以更好的處理這種場景。
在 Stream 中合并文件之前一個比較常用的 api 是 pipe,但是這個 API 對于錯誤處理不是很友好,一不小心還能搞出文件句柄內(nèi)存泄漏問題。
本文先介紹 pipe 方法的使用及什么情況下會遇到文件句柄的內(nèi)存泄漏問題,之后再分別介紹 Stream 合并的兩種實(shí)現(xiàn)模式。
pipe VS pipeline
pipe
創(chuàng)建一個可讀流 readable 和一個可寫流 writeable,通過管道 pipe 將可寫流綁定到可讀流,一個簡單的 Stream 操作就完成了。
const fs = require('fs'); const readable = fs.createReadStream('./test1.txt'); const writeable = fs.createWriteStream('./test2.txt'); readable.pipe(writeable);
pipe 方法的兩個參數(shù):
destination:是一個可寫流對象,也就是一個數(shù)據(jù)寫入的目標(biāo)對象,例如,上面我們創(chuàng)建的 writeable 就是一個可寫流對象
options:
- end:讀取結(jié)束時終止寫入流,默認(rèn)值是 true
readable.pipe(destination[, options])
默認(rèn)情況下我們是不需要手動調(diào)用寫入流的 end 方法關(guān)閉的。
現(xiàn)在我們改一下, 設(shè)置 end 為 false 寫入的目標(biāo)流將會一直處于打開狀態(tài), 此時就需要監(jiān)聽可讀流的 end 事件,結(jié)束之后手動調(diào)用可寫流的 end 方法結(jié)束( 為什么要這樣做?下文 Stream 串行合并會再用到這一特性 )。
// readable.pipe(writeable); readable.pipe(writeable, { end: false, }); readable.on('end', function() { writeable.end('結(jié)束'); });
還需要注意一點(diǎn) 如果可讀流期間發(fā)生什么錯誤,則寫入的目標(biāo)流將不會關(guān)閉 ,例如:process.stderr 和 process.stdout 可寫流在 Nodejs 進(jìn)程退出前將永遠(yuǎn)不會關(guān)閉,所以 需要監(jiān)聽錯誤事件,手動關(guān)閉可寫流,防止內(nèi)存泄漏 。
Linux 下一切皆文件,為了測試,在創(chuàng)建可讀流時,你可以不創(chuàng)建 test1.txt 文件,讓可讀流自動觸發(fā) error 事件并且將 writeable 的 close 方法注釋掉,通過 linux 命令 ls -l /proc/${pid}/fd 查看 error 和非 error 前后的文件句柄變化。
readable.on('error', function(err) { console.log('error', err); // writeable.close(); }); console.log(process.pid); // 打印進(jìn)程 ID setInterval(function(){}, 5000) // 讓程序不中斷,進(jìn)程不退出
以下為觸發(fā) error 錯誤下 test2.txt 這個文件 fd 將會一直打開,除非進(jìn)程退出,所以重要的事情再說一遍, 如果使用 pipe 一定要做好錯誤監(jiān)聽手動關(guān)閉每個寫入流 ,以防止 “ 內(nèi)存泄漏 ”。
...
l-wx------ 1 root root 64 Apr 10 15:47 19 -> /root/study/test2.txt
...
注意,Mac 下沒有 /proc 文件,可通過 docker 測試。
不想開兩個終端的,也可以在程序 setInterval 定時器函數(shù)里使用 child_process 模塊的 exec 函數(shù)執(zhí)行 ls -l /proc/${process.pid}/fd 命令。
const { exec } = require('child_process'); setInterval(function(){ exec(`ls -l /proc/${process.pid}/fd`, (error, stdout, stderr) => { console.log(`stdout: \n`, stdout); }) }, 5000) // 讓程序不中斷,進(jìn)程不退出
pipeline
Stream 模塊的一個新 API pipeline 方法,添加于 Node.js v10.0,Promise 風(fēng)格需要 Node.js v15.0+ 支持。
相比較于 pipe 方法增加了錯誤處理機(jī)制,當(dāng)管道中的某個流發(fā)生錯誤,它會自動處理并釋放掉相應(yīng)的資源。
try { await pipeline( readable, writable ); console.log('Pipeline succeeded.'); } catch (err) { console.log('error', err); }
串行模式 Stream 合并
使用 pipe 方法實(shí)現(xiàn)串行模式的流合并,根據(jù)前面講的,設(shè)置可讀流的 end 為 false 保持寫入流一直處于打開狀態(tài),直到所有的可讀流結(jié)束(待合并的文件完成后),我們再將可寫流給關(guān)閉。
- streamMerge 函數(shù)為入口函數(shù)
- streamMergeRecursive 函數(shù)遞歸調(diào)用合并文件
const fs = require('fs'); const path = require('path'); /** * Stream 合并 * @param { String } sourceFileDirectory 源文件目錄 * @param { String } targetFile 目標(biāo)文件 */ function streamMerge(sourceFileDirectory, targetFile) { const scripts = fs.readdirSync(path.resolve(__dirname, sourceFileDirectory)); // 獲取源文件目錄下的所有文件 const fileWriteStream = fs.createWriteStream(path.resolve(__dirname, targetFile)); // 創(chuàng)建一個可寫流 // fs.readdir 讀取出來的結(jié)果,根據(jù)具體的規(guī)則做下排序,防止因?yàn)轫樞虿粚?dǎo)致最終合并之后的文件無效。 return streamMergeRecursive(scripts, fileWriteStream, sourceFileDirectory); } /** * Stream 合并的遞歸調(diào)用 * @param { Array } scripts * @param { Stream } fileWriteStream */ function streamMergeRecursive(scripts=[], fileWriteStream, sourceFileDirectory) { // 遞歸到尾部情況判斷 if (!scripts.length) { return fileWriteStream.end("console.log('Stream 合并完成')"); // 最后關(guān)閉可寫流,防止內(nèi)存泄漏 } const currentFile = path.resolve(__dirname, sourceFileDirectory, scripts.shift()); const currentReadStream = fs.createReadStream(currentFile); // 獲取當(dāng)前的可讀流 currentReadStream.pipe(fileWriteStream, { end: false }); currentReadStream.on('end', function() { streamMergeRecursive(scripts, fileWriteStream, sourceFileDirectory); }); currentReadStream.on('error', function(error) { // 監(jiān)聽錯誤事件,關(guān)閉可寫流,防止內(nèi)存泄漏 console.error(error); fileWriteStream.close(); }); } streamMerge('./files', './file.js');
并發(fā)模式 Stream 合并
流合并也是可以采用并發(fā)模式的,核心是通過可寫流的 start、end 屬性控制。
start 有點(diǎn)類似于數(shù)據(jù)庫查詢的 skip,在 計算時要求文件分塊的下標(biāo)必須是 0、1、2... 這樣的規(guī)則 ,這種方式可以不用關(guān)注每一個流分塊在文件中的存儲順序,也可以將可讀流傳輸至可寫流的指定位置。
例如,有一個大文件 dec47b76e3220432100a1155eff7f402(文件 md5 后的 hash 值) 根據(jù) chunkSize(1048576)拆分為 3 個小文件。
/chunks └── dec47b76e3220432100a1155eff7f402-1048576 ├── dec47b76e3220432100a1155eff7f402-0 ├── dec47b76e3220432100a1155eff7f402-1 └── dec47b76e3220432100a1155eff7f402-2
并發(fā)模式的 Stream 合并代碼實(shí)現(xiàn)如下:
/** * Stream concurrent merge * @param {String} sourceFileDirectory * @param {String} targetFile * @param {Number} chunkSize */ export const streamConcurrentMerge = async (sourceFileDirectory, targetFile, chunkSize) => { const filenames = await fs.readdir(sourceFileDirectory); await Promise.all(filenames.map(filename => { const index = filename.split('-').pop(); const start = index * chunkSize; const end = (index + 1) * chunkSize; return pipeline( createReadStream(path.join(sourceFileDirectory, filename)), createWriteStream(targetFile, { start, end, }) ); })) }
總結(jié)
使用 pipe 時錯誤處理是件需要注意的事情,特別是出現(xiàn)這種情況 readable.pipe(a).pipe(b).pipe(writable) 其中任何一個流關(guān)閉或出錯都會導(dǎo)致整個管道停止工作,這個時候就要銷毀所有的流,這種復(fù)雜的處理起來極其麻煩, 推薦使用 stream API pipeline 處理,或使用社區(qū) npm 庫 pump。
將多個文件合并為一個文件,使用流的方式有兩種:
- 第一種是串行模式依次讀取每個文件的內(nèi)容,通過 pipe 方法寫入可寫流,直到最后一個文件讀取完成關(guān)閉寫入流。
- 另一種是并發(fā)模式,核心實(shí)現(xiàn)是利用寫入流的 start、end 屬性將可讀流傳輸至可寫流的指定位置,上面的實(shí)現(xiàn)還可以在優(yōu)化,比如控制下并發(fā)的數(shù)量。
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
mac安裝nvm(node.js)多版本管理實(shí)踐步驟
這篇文章主要介紹了mac安裝nvm(node.js)多版本管理的相關(guān)資料,NVM是一個用于管理多個Node.js版本的命令行工具,它允許開發(fā)者在同一臺機(jī)器上安裝、切換和卸載不同版本的Node.js,從而解決版本不兼容的問題,需要的朋友可以參考下2025-02-02nodejs構(gòu)建本地web測試服務(wù)器 如何解決訪問靜態(tài)資源問題
這篇文章主要為大家詳細(xì)介紹了nodejs構(gòu)建本地web測試服務(wù)器,教大家如何解決訪問靜態(tài)資源問題,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-07-07