Node.js pipe實(shí)現(xiàn)源碼解析
從前面兩篇文章,我們了解到。想要把 Readable 的數(shù)據(jù)寫(xiě)到 Writable,就必須先手動(dòng)的將數(shù)據(jù)讀入內(nèi)存,然后寫(xiě)入 Writable。換句話說(shuō),每次傳遞數(shù)據(jù)時(shí),都需要寫(xiě)如下的模板代碼
readable.on('readable', (err) => { if(err) throw err writable.write(readable.read()) })
為了方便使用,Node.js 提供了 pipe() 方法,讓我們可以優(yōu)雅的傳遞數(shù)據(jù)
readable.pipe(writable)
現(xiàn)在,就讓我們來(lái)看看它是如何實(shí)現(xiàn)的吧
pipe
首先需要先調(diào)用 Readable 的 pipe() 方法
// lib/_stream_readable.js Readable.prototype.pipe = function(dest, pipeOpts) { var src = this; var state = this._readableState; // 記錄 Writable switch (state.pipesCount) { case 0: state.pipes = dest; break; case 1: state.pipes = [state.pipes, dest]; break; default: state.pipes.push(dest); break; } state.pipesCount += 1; // ... src.once('end', endFn); dest.on('unpipe', onunpipe); // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保證 error 事件觸發(fā)時(shí),onerror 首先被執(zhí)行 prependListener(dest, 'error', onerror); // ... dest.once('close', onclose); // ... dest.once('finish', onfinish); // ... // 觸發(fā) Writable 的 pipe 事件 dest.emit('pipe', src); // 將 Readable 改為 flow 模式 if (!state.flowing) { debug('pipe resume'); src.resume(); } return dest; };
執(zhí)行 pipe() 函數(shù)時(shí),首先將 Writable 記錄到 state.pipes 中,然后綁定相關(guān)事件,最后如果 Readable 不是 flow 模式,就調(diào)用 resume() 將 Readable 改為 flow 模式
傳遞數(shù)據(jù)
Readable 從數(shù)據(jù)源獲取到數(shù)據(jù)后,觸發(fā) data 事件,執(zhí)行 ondata()
ondata() 相關(guān)代碼:
// lib/_stream_readable.js // 防止在 dest.write(chunk) 內(nèi)調(diào)用 src.push(chunk) 造成 awaitDrain 重復(fù)增加,awaitDrain 不能清零,Readable 卡住的情況 // 詳情見(jiàn) https://github.com/nodejs/node/issues/7278 var increasedAwaitDrain = false; function ondata(chunk) { debug('ondata'); increasedAwaitDrain = false; var ret = dest.write(chunk); if (false === ret && !increasedAwaitDrain) { // 防止在 dest.write() 內(nèi)調(diào)用 src.unpipe(dest),導(dǎo)致 awaitDrain 不能清零,Readable 卡住的情況 if (((state.pipesCount === 1 && state.pipes === dest) || (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1) ) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; increasedAwaitDrain = true; } // 進(jìn)入 pause 模式 src.pause(); } }
在 ondata(chunk) 函數(shù)內(nèi),通過(guò) dest.write(chunk) 將數(shù)據(jù)寫(xiě)入 Writable
此時(shí),在 _write() 內(nèi)部可能會(huì)調(diào)用 src.push(chunk) 或使其 unpipe,這會(huì)導(dǎo)致 awaitDrain 多次增加,不能清零,Readable 卡住
當(dāng)不能再向 Writable 寫(xiě)入數(shù)據(jù)時(shí),Readable 會(huì)進(jìn)入 pause 模式,直到所有的 drain 事件觸發(fā)
觸發(fā) drain 事件,執(zhí)行 ondrain()
// lib/_stream_readable.js var ondrain = pipeOnDrain(src); function pipeOnDrain(src) { return function() { var state = src._readableState; debug('pipeOnDrain', state.awaitDrain); if (state.awaitDrain) state.awaitDrain--; // awaitDrain === 0,且有 data 監(jiān)聽(tīng)器 if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) { state.flowing = true; flow(src); } }; }
每個(gè) drain 事件觸發(fā)時(shí),都會(huì)減少 awaitDrain,直到 awaitDrain 為 0。此時(shí),調(diào)用 flow(src),使 Readable 進(jìn)入 flow 模式
到這里,整個(gè)數(shù)據(jù)傳遞循環(huán)已經(jīng)建立,數(shù)據(jù)會(huì)順著循環(huán)源源不斷的流入 Writable,直到所有數(shù)據(jù)寫(xiě)入完成
unpipe
不管寫(xiě)入過(guò)程中是否出現(xiàn)錯(cuò)誤,最后都會(huì)執(zhí)行 unpipe()
// lib/_stream_readable.js // ... function unpipe() { debug('unpipe'); src.unpipe(dest); } // ... Readable.prototype.unpipe = function(dest) { var state = this._readableState; var unpipeInfo = { hasUnpiped: false }; // 啥也沒(méi)有 if (state.pipesCount === 0) return this; // 只有一個(gè) if (state.pipesCount === 1) { if (dest && dest !== state.pipes) return this; // 沒(méi)有指定就 unpipe 所有 if (!dest) dest = state.pipes; state.pipes = null; state.pipesCount = 0; state.flowing = false; if (dest) dest.emit('unpipe', this, unpipeInfo); return this; } // 沒(méi)有指定就 unpipe 所有 if (!dest) { var dests = state.pipes; var len = state.pipesCount; state.pipes = null; state.pipesCount = 0; state.flowing = false; for (var i = 0; i < len; i++) dests[i].emit('unpipe', this, unpipeInfo); return this; } // 找到指定 Writable,并 unpipe var index = state.pipes.indexOf(dest); if (index === -1) return this; state.pipes.splice(index, 1); state.pipesCount -= 1; if (state.pipesCount === 1) state.pipes = state.pipes[0]; dest.emit('unpipe', this, unpipeInfo); return this; };
Readable.prototype.unpipe() 函數(shù)會(huì)根據(jù) state.pipes 屬性和 dest 參數(shù),選擇執(zhí)行策略。最后會(huì)觸發(fā) dest 的 unpipe 事件
unpipe 事件觸發(fā)后,調(diào)用 onunpipe(),清理相關(guān)數(shù)據(jù)
// lib/_stream_readable.js function onunpipe(readable, unpipeInfo) { debug('onunpipe'); if (readable === src) { if (unpipeInfo && unpipeInfo.hasUnpiped === false) { unpipeInfo.hasUnpiped = true; // 清理相關(guān)數(shù)據(jù) cleanup(); } } }
End
在整個(gè) pipe 的過(guò)程中,Readable 是主動(dòng)方 ( 負(fù)責(zé)整個(gè) pipe 過(guò)程:包括數(shù)據(jù)傳遞、unpipe 與異常處理 ),Writable 是被動(dòng)方 ( 只需要觸發(fā) drain 事件 )
總結(jié)一下 pipe 的過(guò)程:
- 首先執(zhí)行 readbable.pipe(writable),將 readable 與 writable 對(duì)接上
- 當(dāng) readable 中有數(shù)據(jù)時(shí),readable.emit('data'),將數(shù)據(jù)寫(xiě)入 writable
- 如果 writable.write(chunk) 返回 false,則進(jìn)入 pause 模式,等待 drain 事件觸發(fā)
- drain 事件全部觸發(fā)后,再次進(jìn)入 flow 模式,寫(xiě)入數(shù)據(jù)
- 不管數(shù)據(jù)寫(xiě)入完成或發(fā)生中斷,最后都會(huì)調(diào)用 unpipe()
- unpipe() 調(diào)用 Readable.prototype.unpipe(),觸發(fā) dest 的 unpipe 事件,清理相關(guān)數(shù)據(jù)
參考:
https://github.com/nodejs/node/blob/master/lib/_stream_readable.js
https://github.com/nodejs/node/blob/master/lib/_stream_writable.js
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
詳解Node.js中path模塊的resolve()和join()方法的區(qū)別
這篇文章主要介紹了詳解Node.js中path模塊的resolve()和join()方法的區(qū)別,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-10-10Node.js中的HTTP請(qǐng)求與響應(yīng)詳解
本文詳細(xì)講解了Node.js中的HTTP請(qǐng)求與響應(yīng),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07實(shí)例詳解Nodejs 保存 payload 發(fā)送過(guò)來(lái)的文件
這篇文章主要介紹了實(shí)例詳解Nodejs 保存 payload 發(fā)送過(guò)來(lái)的文件 的相關(guān)資料,需要的朋友可以參考下2016-01-01nodejs中內(nèi)置模塊fs,path常見(jiàn)的用法說(shuō)明
這篇文章主要介紹了nodejs中內(nèi)置模塊fs,path常見(jiàn)的用法說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-11-11