亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Node.js多文件Stream合并,串行和并發(fā)兩種模式的實(shí)現(xiàn)方式

 更新時間:2023年10月27日 09:15:46   作者:高先生的貓  
這篇文章主要介紹了Node.js多文件Stream合并,串行和并發(fā)兩種模式的實(shí)現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

將多個文件合并為一個文件,常見的場景是類似于大文件分片上傳,事先根據(jù)一定的文件大小拆分為多個小文件上傳到服務(wù)端,最后服務(wù)端在合并起來。

怎么合并?一種簡單的辦法是使用 fs.readFile 讀取,fs.writeFile 追加寫入,這種方式是將文件數(shù)據(jù)先讀入應(yīng)用內(nèi)存再寫入,不是很推薦,Node.js 本身提供了 Stream 模塊可以更好的處理這種場景。

在 Stream 中合并文件之前一個比較常用的 api 是 pipe,但是這個 API 對于錯誤處理不是很友好,一不小心還能搞出文件句柄內(nèi)存泄漏問題。

本文先介紹 pipe 方法的使用及什么情況下會遇到文件句柄的內(nèi)存泄漏問題,之后再分別介紹 Stream 合并的兩種實(shí)現(xiàn)模式。

pipe VS pipeline

pipe

創(chuàng)建一個可讀流 readable 和一個可寫流 writeable,通過管道 pipe 將可寫流綁定到可讀流,一個簡單的 Stream 操作就完成了。

const fs = require('fs');
const readable = fs.createReadStream('./test1.txt');
const writeable = fs.createWriteStream('./test2.txt');
 
readable.pipe(writeable);

pipe 方法的兩個參數(shù):

destination:是一個可寫流對象,也就是一個數(shù)據(jù)寫入的目標(biāo)對象,例如,上面我們創(chuàng)建的 writeable 就是一個可寫流對象

options:

  • end:讀取結(jié)束時終止寫入流,默認(rèn)值是 true
readable.pipe(destination[, options])

默認(rèn)情況下我們是不需要手動調(diào)用寫入流的 end 方法關(guān)閉的。

現(xiàn)在我們改一下, 設(shè)置 end 為 false 寫入的目標(biāo)流將會一直處于打開狀態(tài), 此時就需要監(jiān)聽可讀流的 end 事件,結(jié)束之后手動調(diào)用可寫流的 end 方法結(jié)束( 為什么要這樣做?下文 Stream 串行合并會再用到這一特性 )。

// readable.pipe(writeable);
 
readable.pipe(writeable, {
  end: false,
});
readable.on('end', function() {
  writeable.end('結(jié)束');
});

還需要注意一點(diǎn) 如果可讀流期間發(fā)生什么錯誤,則寫入的目標(biāo)流將不會關(guān)閉 ,例如:process.stderr 和 process.stdout 可寫流在 Nodejs 進(jìn)程退出前將永遠(yuǎn)不會關(guān)閉,所以 需要監(jiān)聽錯誤事件,手動關(guān)閉可寫流,防止內(nèi)存泄漏 。

Linux 下一切皆文件,為了測試,在創(chuàng)建可讀流時,你可以不創(chuàng)建 test1.txt 文件,讓可讀流自動觸發(fā) error 事件并且將 writeable 的 close 方法注釋掉,通過 linux 命令 ls -l /proc/${pid}/fd 查看 error 和非 error 前后的文件句柄變化。

readable.on('error', function(err) {
  console.log('error', err);
  // writeable.close();
});
 
console.log(process.pid); // 打印進(jìn)程 ID
setInterval(function(){}, 5000) // 讓程序不中斷,進(jìn)程不退出

以下為觸發(fā) error 錯誤下 test2.txt 這個文件 fd 將會一直打開,除非進(jìn)程退出,所以重要的事情再說一遍, 如果使用 pipe 一定要做好錯誤監(jiān)聽手動關(guān)閉每個寫入流 ,以防止 “ 內(nèi)存泄漏 ”。

...
l-wx------ 1 root root 64 Apr 10 15:47 19 -> /root/study/test2.txt
...

注意,Mac 下沒有 /proc 文件,可通過 docker 測試。

不想開兩個終端的,也可以在程序 setInterval 定時器函數(shù)里使用 child_process 模塊的 exec 函數(shù)執(zhí)行 ls -l /proc/${process.pid}/fd 命令。

const { exec } = require('child_process');
 
setInterval(function(){
  exec(`ls -l /proc/${process.pid}/fd`, (error, stdout, stderr) => {
    console.log(`stdout: \n`, stdout);
  })
}, 5000) // 讓程序不中斷,進(jìn)程不退出

pipeline

Stream 模塊的一個新 API pipeline 方法,添加于 Node.js v10.0,Promise 風(fēng)格需要 Node.js  v15.0+ 支持。

相比較于 pipe 方法增加了錯誤處理機(jī)制,當(dāng)管道中的某個流發(fā)生錯誤,它會自動處理并釋放掉相應(yīng)的資源。

try {
  await pipeline(
    readable,
    writable
  );
  console.log('Pipeline succeeded.');
} catch (err) {
  console.log('error', err);
}

串行模式 Stream 合并

使用 pipe 方法實(shí)現(xiàn)串行模式的流合并,根據(jù)前面講的,設(shè)置可讀流的 end 為 false 保持寫入流一直處于打開狀態(tài),直到所有的可讀流結(jié)束(待合并的文件完成后),我們再將可寫流給關(guān)閉。

  • streamMerge 函數(shù)為入口函數(shù)
  • streamMergeRecursive 函數(shù)遞歸調(diào)用合并文件
const fs = require('fs');
const path = require('path');
 
/**
 * Stream 合并
 * @param { String } sourceFileDirectory 源文件目錄
 * @param { String } targetFile 目標(biāo)文件
 */
function streamMerge(sourceFileDirectory, targetFile) {
  const scripts =  fs.readdirSync(path.resolve(__dirname, sourceFileDirectory)); // 獲取源文件目錄下的所有文件
  const fileWriteStream = fs.createWriteStream(path.resolve(__dirname, targetFile)); // 創(chuàng)建一個可寫流
 
  // fs.readdir 讀取出來的結(jié)果,根據(jù)具體的規(guī)則做下排序,防止因?yàn)轫樞虿粚?dǎo)致最終合并之后的文件無效。
  
  return streamMergeRecursive(scripts, fileWriteStream, sourceFileDirectory);
}
 
/**
 * Stream 合并的遞歸調(diào)用
 * @param { Array } scripts
 * @param { Stream } fileWriteStream
 */
function streamMergeRecursive(scripts=[], fileWriteStream, sourceFileDirectory) {
  // 遞歸到尾部情況判斷
  if (!scripts.length) {
    return fileWriteStream.end("console.log('Stream 合并完成')"); // 最后關(guān)閉可寫流,防止內(nèi)存泄漏
  }
 
  const currentFile = path.resolve(__dirname, sourceFileDirectory, scripts.shift());
  const currentReadStream = fs.createReadStream(currentFile); // 獲取當(dāng)前的可讀流
 
  currentReadStream.pipe(fileWriteStream, { end: false });
  currentReadStream.on('end', function() {
    streamMergeRecursive(scripts, fileWriteStream, sourceFileDirectory);
  });
 
  currentReadStream.on('error', function(error) { // 監(jiān)聽錯誤事件,關(guān)閉可寫流,防止內(nèi)存泄漏
    console.error(error);
    fileWriteStream.close();
  });
}
 
streamMerge('./files', './file.js');

并發(fā)模式 Stream 合并

流合并也是可以采用并發(fā)模式的,核心是通過可寫流的 start、end 屬性控制。

start 有點(diǎn)類似于數(shù)據(jù)庫查詢的 skip,在 計算時要求文件分塊的下標(biāo)必須是 0、1、2... 這樣的規(guī)則 ,這種方式可以不用關(guān)注每一個流分塊在文件中的存儲順序,也可以將可讀流傳輸至可寫流的指定位置。

例如,有一個大文件 dec47b76e3220432100a1155eff7f402(文件 md5 后的 hash 值) 根據(jù) chunkSize(1048576)拆分為 3 個小文件。

/chunks
└── dec47b76e3220432100a1155eff7f402-1048576
    ├── dec47b76e3220432100a1155eff7f402-0
    ├── dec47b76e3220432100a1155eff7f402-1
    └── dec47b76e3220432100a1155eff7f402-2

并發(fā)模式的 Stream 合并代碼實(shí)現(xiàn)如下:

/**
 * Stream concurrent merge
 * @param {String} sourceFileDirectory
 * @param {String} targetFile
 * @param {Number} chunkSize
 */
export const streamConcurrentMerge = async (sourceFileDirectory, targetFile, chunkSize) => {
  const filenames = await fs.readdir(sourceFileDirectory);
  
  await Promise.all(filenames.map(filename => {
    const index = filename.split('-').pop();
    const start = index * chunkSize;
    const end = (index + 1) * chunkSize;
 
    return pipeline(
      createReadStream(path.join(sourceFileDirectory, filename)),
      createWriteStream(targetFile, {
        start,
        end,
      })
    );
  }))
}

總結(jié)

使用 pipe 時錯誤處理是件需要注意的事情,特別是出現(xiàn)這種情況 readable.pipe(a).pipe(b).pipe(writable) 其中任何一個流關(guān)閉或出錯都會導(dǎo)致整個管道停止工作,這個時候就要銷毀所有的流,這種復(fù)雜的處理起來極其麻煩, 推薦使用 stream API pipeline 處理,或使用社區(qū) npm 庫 pump。

將多個文件合并為一個文件,使用流的方式有兩種:

  • 第一種是串行模式依次讀取每個文件的內(nèi)容,通過 pipe 方法寫入可寫流,直到最后一個文件讀取完成關(guān)閉寫入流。
  • 另一種是并發(fā)模式,核心實(shí)現(xiàn)是利用寫入流的 start、end 屬性將可讀流傳輸至可寫流的指定位置,上面的實(shí)現(xiàn)還可以在優(yōu)化,比如控制下并發(fā)的數(shù)量。

以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • Node.js 異步異常的處理與domain模塊解析

    Node.js 異步異常的處理與domain模塊解析

    本篇文章主要介紹了Node.js 異步異常的處理與domain模塊解析,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-05-05
  • Nodejs核心模塊之net和http的使用詳解

    Nodejs核心模塊之net和http的使用詳解

    net和http模塊都是node核心模塊之一,他們都可以搭建自己的服務(wù)端和客戶端,以響應(yīng)請求和發(fā)送請求。這篇文章主要介紹了Nodejs核心模塊之net和http的使用詳解,感興趣的小伙伴們可以參考一下
    2019-04-04
  • pm2工具在Node.js開發(fā)部署中的重要作用詳解

    pm2工具在Node.js開發(fā)部署中的重要作用詳解

    這篇文章主要為大家介紹了pm2工具在Node.js開發(fā)部署中的重要作用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-12-12
  • pm2啟動ssr失敗的解決方法

    pm2啟動ssr失敗的解決方法

    這篇文章主要介紹了pm2啟動ssr失敗的解決方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-06-06
  • mac安裝nvm(node.js)多版本管理實(shí)踐步驟

    mac安裝nvm(node.js)多版本管理實(shí)踐步驟

    這篇文章主要介紹了mac安裝nvm(node.js)多版本管理的相關(guān)資料,NVM是一個用于管理多個Node.js版本的命令行工具,它允許開發(fā)者在同一臺機(jī)器上安裝、切換和卸載不同版本的Node.js,從而解決版本不兼容的問題,需要的朋友可以參考下
    2025-02-02
  • nodejs構(gòu)建本地web測試服務(wù)器 如何解決訪問靜態(tài)資源問題

    nodejs構(gòu)建本地web測試服務(wù)器 如何解決訪問靜態(tài)資源問題

    這篇文章主要為大家詳細(xì)介紹了nodejs構(gòu)建本地web測試服務(wù)器,教大家如何解決訪問靜態(tài)資源問題,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-07-07
  • ExpressJS入門實(shí)例

    ExpressJS入門實(shí)例

    這篇文章主要介紹了ExpressJS入門實(shí)例,本文講解了創(chuàng)建項目、進(jìn)入目錄、安裝項目依賴的包、創(chuàng)建應(yīng)用程序、運(yùn)行程序等內(nèi)容,需要的朋友可以參考下
    2015-01-01
  • Node.js 深度調(diào)試方法解析

    Node.js 深度調(diào)試方法解析

    這篇文章主要介紹了Node.js 深度調(diào)試方法解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07
  • node.js中的console.dir方法使用說明

    node.js中的console.dir方法使用說明

    這篇文章主要介紹了node.js中的console.dir方法使用說明,本文介紹了console.dir的方法說明、語法、接收參數(shù)、使用實(shí)例和實(shí)現(xiàn)源碼,需要的朋友可以參考下
    2014-12-12
  • Node.js 異步編程之 Callback介紹(一)

    Node.js 異步編程之 Callback介紹(一)

    這篇文章主要介紹了Node.js 異步編程之 Callback介紹(一),本文用實(shí)例講解Callback的相關(guān)知識,本文是第一篇,下一篇小編會跟進(jìn),需要的朋友可以參考下
    2015-03-03

最新評論