node.js中stream流中可讀流和可寫流的實(shí)現(xiàn)與使用方法實(shí)例分析
本文實(shí)例講述了node.js中stream流中可讀流和可寫流的實(shí)現(xiàn)與使用方法。分享給大家供大家參考,具體如下:
node.js中的流 stream 是處理流式數(shù)據(jù)的抽象接口。node.js 提供了很多流對象,像http中的request和response,和 process.stdout 都是流的實(shí)例。
流可以是 可讀的,可寫的,或是可讀可寫的。所有流都是 events 的實(shí)例。
一、流的類型
node.js中有四種基本流類型:
1、Writable 可寫流 (例:fs.createWriteStream() )
2、Readable 可讀流 (例:fs.createReadStream() )
3、Duplex 可讀又可寫流 (例:net.Socket )
4、Transform 讀寫過程中可修改或轉(zhuǎn)換數(shù)據(jù)的 Duplex 流 (例:zlib.createDeflate() )
二、流中的數(shù)據(jù)有兩種模式
1、二進(jìn)制模式,都是 string字符串 和 Buffer。
2、對象模式,流內(nèi)部處理的是一系統(tǒng)普通對象。
三、可讀流的兩種模式
1、流動(dòng)模式 ( flowing ) ,數(shù)據(jù)自動(dòng)從系統(tǒng)底層讀取,并通過事件,盡可能快地提供給應(yīng)用程序。
2、暫停模式 ( paused ),必須顯式的調(diào)用 read() 讀取數(shù)據(jù)。
可讀流 都開始于暫停模式,可以通過如下方法切換到流動(dòng)模式:
1、添加 'data' 事件回調(diào)。
2、調(diào)用 resume()。
3、調(diào)用 pipe()。
可讀流通過如下方法切換回暫停模式:
1、如果沒有管道目標(biāo),調(diào)用 pause()。
2、如果有管道目標(biāo),移除所有管道目標(biāo),調(diào)用 unpipe() 移除多個(gè)管道目標(biāo)。
四、創(chuàng)建可讀流,并監(jiān)聽事件
const fs = require('fs'); //創(chuàng)建一個(gè)文件可讀流 let rs = fs.createReadStream('./1.txt', { //文件系統(tǒng)標(biāo)志 flags: 'r', //數(shù)據(jù)編碼,如果調(diào)置了該參數(shù),則讀取的數(shù)據(jù)會(huì)自動(dòng)解析 //如果沒調(diào)置,則讀取的數(shù)據(jù)會(huì)是 Buffer //也可以通過 rs.setEncoding() 進(jìn)行設(shè)置 encoding: 'utf8', //文件描述符,默認(rèn)為null fd: null, //文件權(quán)限 mode: 0o666, //文件讀取的開始位置 start: 0, //文件讀取的結(jié)束位置(包括結(jié)束位置) end: Infinity, //讀取緩沖區(qū)的大小,默認(rèn)64K highWaterMark: 3 }); //文件被打開時(shí)觸發(fā) rs.on('open', function () { console.log('文件打開'); }); //監(jiān)聽data事件,會(huì)讓當(dāng)前流切換到流動(dòng)模式 //當(dāng)流中將數(shù)據(jù)傳給消費(fèi)者后觸發(fā) //由于我們在上面配置了 highWaterMark 為 3字節(jié),所以下面會(huì)打印多次。 rs.on('data', function (data) { console.log(data); }); //流中沒有數(shù)據(jù)可供消費(fèi)者時(shí)觸發(fā) rs.on('end', function () { console.log('數(shù)據(jù)讀取完畢'); }); //讀取數(shù)據(jù)出錯(cuò)時(shí)觸發(fā) rs.on('error', function () { console.log('讀取錯(cuò)誤'); }); //當(dāng)文件被關(guān)閉時(shí)觸發(fā) rs.on('close', function () { console.log('文件關(guān)閉'); });
注意,'open' 和 'close' 事件并不是所有流都會(huì)觸發(fā)。
當(dāng)們監(jiān)聽'data'事件后,系統(tǒng)會(huì)盡可能快的讀取出數(shù)據(jù)。但有時(shí)候,我們需要暫停一下流的讀取,操作其他事情。
這時(shí)候就需要用到 pause() 和 resume() 方法。
const fs = require('fs'); //創(chuàng)建一個(gè)文件可讀流 let rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); rs.on('data', function (data) { console.log(`讀取了 ${data.length} 字節(jié)數(shù)據(jù) : ${data.toString()}`); //使流動(dòng)模式的流停止觸發(fā)'data'事件,切換出流動(dòng)模式,數(shù)據(jù)都會(huì)保留在內(nèi)部緩存中。 rs.pause(); //等待3秒后,再恢復(fù)觸發(fā)'data'事件,將流切換回流動(dòng)模式。 setTimeout(function () { rs.resume(); }, 3000); });
可讀流的 'readable' 事件,當(dāng)流中有數(shù)據(jù)可供讀取時(shí)就觸發(fā)。
注意當(dāng)監(jiān)聽 'readable' 事件后,會(huì)導(dǎo)致流停止流動(dòng),需調(diào)用 read() 方法讀取數(shù)據(jù)。
注意 on('data'),on('readable'),pipe() 不要混合使用,會(huì)導(dǎo)致不明確的行為。
const fs = require('fs'); let rs = fs.createReadStream('./1.txt', { highWaterMark: 1 }); //當(dāng)流中有數(shù)據(jù)可供讀取時(shí)就觸發(fā) rs.on('readable', function () { let data; //循環(huán)讀取數(shù)據(jù) //參數(shù)表示要讀取的字節(jié)數(shù) //如果可讀的數(shù)據(jù)不足字節(jié)數(shù),則返回緩沖區(qū)剩余數(shù)據(jù) //如是沒有指定字節(jié)數(shù),則返回緩沖區(qū)中所有數(shù)據(jù) while (data = rs.read()) { console.log(`讀取到 ${data.length} 字節(jié)數(shù)據(jù)`); console.log(data.toString()); } });
五、創(chuàng)建可寫流,并監(jiān)聽事件
const fs = require('fs'); //創(chuàng)建一個(gè)文件可寫流 let ws = fs.createWriteStream('./1.txt', { highWaterMark: 3 }); //往流中寫入數(shù)據(jù) //參數(shù)一表示要寫入的數(shù)據(jù) //參數(shù)二表示編碼方式 //參數(shù)三表示寫入成功的回調(diào) //緩沖區(qū)滿時(shí)返回false,未滿時(shí)返回true。 //由于上面我們設(shè)置的緩沖區(qū)大小為 3字節(jié),所以到寫入第3個(gè)時(shí),就返回了false。 console.log(ws.write('1', 'utf8')); console.log(ws.write('2', 'utf8')); console.log(ws.write('3', 'utf8')); console.log(ws.write('4', 'utf8')); function writeData() { let cnt = 9; return function () { let flag = true; while (cnt && flag) { flag = ws.write(`${cnt}`); console.log('緩沖區(qū)中寫入的字節(jié)數(shù)', ws.writableLength); cnt--; } }; } let wd = writeData(); wd(); //當(dāng)緩沖區(qū)中的數(shù)據(jù)滿的時(shí)候,應(yīng)停止寫入數(shù)據(jù), //一旦緩沖區(qū)中的數(shù)據(jù)寫入文件了,并清空了,則會(huì)觸發(fā) 'drain' 事件,告訴生產(chǎn)者可以繼續(xù)寫數(shù)據(jù)了。 ws.on('drain', function () { console.log('可以繼續(xù)寫數(shù)據(jù)了'); console.log('緩沖區(qū)中寫入的字節(jié)數(shù)', ws.writableLength); wd(); }); //當(dāng)流或底層資源關(guān)閉時(shí)觸發(fā) ws.on('close', function () { console.log('文件被關(guān)閉'); }); //當(dāng)寫入數(shù)據(jù)出錯(cuò)時(shí)觸發(fā) ws.on('error', function () { console.log('寫入數(shù)據(jù)錯(cuò)誤'); });
寫入流的 end() 方法 和 'finish' 事件監(jiān)聽
const fs = require('fs'); //創(chuàng)建一個(gè)文件可寫流 let ws = fs.createWriteStream('./1.txt', { highWaterMark: 3 }); //往流中寫入數(shù)據(jù) //參數(shù)一表示要寫入的數(shù)據(jù) //參數(shù)二表示編碼方式 //參數(shù)三表示寫入成功的回調(diào) //緩沖區(qū)滿時(shí)返回false,未滿時(shí)返回true。 //由于上面我們設(shè)置的緩沖區(qū)大小為 3字節(jié),所以到寫入第3個(gè)時(shí),就返回了false。 console.log(ws.write('1', 'utf8')); console.log(ws.write('2', 'utf8')); console.log(ws.write('3', 'utf8')); console.log(ws.write('4', 'utf8')); //調(diào)用end()表明已經(jīng)沒有數(shù)據(jù)要被寫入,在關(guān)閉流之前再寫一塊數(shù)據(jù)。 //如果傳入了回調(diào)函數(shù),則將作為 'finish' 事件的回調(diào)函數(shù) ws.end('最后一點(diǎn)數(shù)據(jù)', 'utf8'); //調(diào)用 end() 且緩沖區(qū)數(shù)據(jù)都已傳給底層系統(tǒng)時(shí)觸發(fā) ws.on('finish', function () { console.log('寫入完成'); });
寫入流的 cork() 和 uncork() 方法,主要是為了解決大量小塊數(shù)據(jù)寫入時(shí),內(nèi)部緩沖可能失效,導(dǎo)致的性能下降。
const fs = require('fs'); let ws = fs.createWriteStream('./1.txt', { highWaterMark: 1 }); //調(diào)用 cork() 后,會(huì)強(qiáng)制把所有寫入的數(shù)據(jù)緩沖到內(nèi)存中。 //不會(huì)因?yàn)閷懭氲臄?shù)據(jù)超過了 highWaterMark 的設(shè)置而寫入到文件中。 ws.cork(); ws.write('1'); console.log(ws.writableLength); ws.write('2'); console.log(ws.writableLength); ws.write('3'); console.log(ws.writableLength); //將調(diào)用 cork() 后的緩沖數(shù)據(jù)都輸出到目標(biāo),也就是寫入文件中。 ws.uncork();
注意 cork() 的調(diào)用次數(shù)要與 uncork() 一致。
const fs = require('fs'); let ws = fs.createWriteStream('./1.txt', { highWaterMark: 1 }); //調(diào)用一次 cork() 就應(yīng)該寫一次 uncork(),兩者要一一對應(yīng)。 ws.cork(); ws.write('4'); ws.write('5'); ws.cork(); ws.write('6'); process.nextTick(function () { //注意這里只調(diào)用了一次 uncork() ws.uncork(); //只有調(diào)用同樣次數(shù)的 uncork() 數(shù)據(jù)才會(huì)被輸出。 ws.uncork(); });
六、可讀流的 pipe() 方法
pipe() 方法類似下面的代碼,在可讀流與可寫流之前架起一座橋梁。
const fs = require('fs'); //創(chuàng)建一個(gè)可讀流 let rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); //創(chuàng)建一個(gè)可寫流 let ws = fs.createWriteStream('./2.txt', { highWaterMark: 3 }); rs.on('data', function (data) { let flag = ws.write(data); console.log(`往可寫流中寫入 ${data.length} 字節(jié)數(shù)據(jù)`); //如果寫入緩沖區(qū)已滿,則暫停可讀流的讀取 if (!flag) { rs.pause(); console.log('暫??勺x流'); } }); //監(jiān)控可讀流數(shù)據(jù)是否讀完 rs.on('end', function () { console.log('數(shù)據(jù)已讀完'); //如果可讀流讀完了,則調(diào)用 end() 表示可寫流已寫入完成 ws.end(); }); //如果可寫流緩沖區(qū)已清空,可以再次寫入,則重新打開可讀流 ws.on('drain', function () { rs.resume(); console.log('重新開啟可讀流'); });
我們用 pipe() 方法完成上面的功能。
const fs = require('fs'); //創(chuàng)建一個(gè)可讀流 let rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); //創(chuàng)建一個(gè)可寫流 let ws = fs.createWriteStream('./2.txt', { highWaterMark: 3 }); let ws2 = fs.createWriteStream('./3.txt', { highWaterMark: 3 }); //綁定可寫流到可讀流,自動(dòng)將可讀流切換到流動(dòng)模式,將可讀流的所有數(shù)據(jù)推送到可寫流。 rs.pipe(ws); //可以綁定多個(gè)可寫流 rs.pipe(ws2);
我們也可以用 unpipe() 手動(dòng)的解綁可寫流。
const fs = require('fs'); //創(chuàng)建一個(gè)可讀流 let rs = fs.createReadStream('./1.txt', { highWaterMark: 3 }); //創(chuàng)建一個(gè)可寫流 let ws = fs.createWriteStream('./2.txt', { highWaterMark: 3 }); let ws2 = fs.createWriteStream('./3.txt', { highWaterMark: 3 }); rs.pipe(ws); rs.pipe(ws2); //解綁可寫流,如果參數(shù)沒寫,則解綁所有管道 setTimeout(function () { rs.unpipe(ws2); }, 0);
希望本文所述對大家node.js程序設(shè)計(jì)有所幫助。
- Node.js中的流(Stream)的作用詳解
- node.js同步/異步文件讀寫-fs,Stream文件流操作實(shí)例詳解
- Node.js數(shù)據(jù)流Stream之Duplex流和Transform流用法
- Node.js數(shù)據(jù)流Stream之Readable流和Writable流用法
- node.js使用stream模塊實(shí)現(xiàn)自定義流示例
- Node.js中你不可不精的Stream(流)
- Node.js中流(stream)的使用方法示例
- Node.js中的流(Stream)介紹
- Node.js 中的流Stream模塊簡介及如何使用流進(jìn)行數(shù)據(jù)處理
相關(guān)文章
nodejs中內(nèi)置模塊fs,path常見的用法說明
這篇文章主要介紹了nodejs中內(nèi)置模塊fs,path常見的用法說明,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-11-11在Debian(Raspberry Pi)樹莓派上安裝NodeJS的教程詳解
在樹莓派上運(yùn)行NodeJS并不需要特別的配置,你只需要確??梢杂胦penssh遠(yuǎn)程連接到你的樹莓派就ok了,關(guān)于在Debian(Raspberry Pi)樹莓派上安裝NodeJS的方法,大家可以通過本文了解下2017-09-09node.js將MongoDB數(shù)據(jù)同步到MySQL的步驟
這篇文章主要給大家介紹了關(guān)于node.js將MongoDB數(shù)據(jù)同步到MySQL的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧。2017-12-12Node.js利用Express實(shí)現(xiàn)用戶注冊登陸功能(推薦)
這篇文章主要介紹了Node.js利用Express實(shí)現(xiàn)用戶注冊登陸功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10Node 使用express-http-proxy 做api網(wǎng)關(guān)的實(shí)現(xiàn)
這篇文章主要介紹了Node 使用express-http-proxy 做api網(wǎng)關(guān)的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10window系統(tǒng) nodejs安裝opencv環(huán)境配置圖文詳解
這篇文章主要介紹了window系統(tǒng) nodejs安裝opencv環(huán)境配置,結(jié)合圖文形式詳細(xì)分析了window環(huán)境下 nodejs安裝opencv的具體步驟、注意事項(xiàng)2023-04-04