国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > JavaScript > 正文

Node.js pipe實現源碼解析

2019-11-19 15:49:34
字體:
來源:轉載
供稿:網友

從前面兩篇文章,我們了解到。想要把 Readable 的數據寫到 Writable,就必須先手動的將數據讀入內存,然后寫入 Writable。換句話說,每次傳遞數據時,都需要寫如下的模板代碼

readable.on('readable', (err) => { if(err) throw err writable.write(readable.read())})

為了方便使用,Node.js 提供了 pipe() 方法,讓我們可以優雅的傳遞數據

readable.pipe(writable)

現在,就讓我們來看看它是如何實現的吧

pipe

首先需要先調用 Readable 的 pipe() 方法

// lib/_stream_readable.jsReadable.prototype.pipe = function(dest, pipeOpts) { var src = this; var state = this._readableState; // 記錄 Writable switch (state.pipesCount) {  case 0:   state.pipes = dest;   break;  case 1:   state.pipes = [state.pipes, dest];   break;  default:   state.pipes.push(dest);   break; } state.pipesCount += 1; // ...  src.once('end', endFn); dest.on('unpipe', onunpipe);  // ... dest.on('drain', ondrain); // ... src.on('data', ondata); // ... // 保證 error 事件觸發時,onerror 首先被執行 prependListener(dest, 'error', onerror); // ... dest.once('close', onclose);  // ... dest.once('finish', onfinish); // ... // 觸發 Writable 的 pipe 事件 dest.emit('pipe', src); // 將 Readable 改為 flow 模式 if (!state.flowing) {  debug('pipe resume');  src.resume(); } return dest;};

執行 pipe() 函數時,首先將 Writable 記錄到 state.pipes 中,然后綁定相關事件,最后如果 Readable 不是 flow 模式,就調用 resume() 將 Readable 改為 flow 模式

傳遞數據

Readable 從數據源獲取到數據后,觸發 data 事件,執行 ondata()

ondata() 相關代碼:

// lib/_stream_readable.js // 防止在 dest.write(chunk) 內調用 src.push(chunk) 造成 awaitDrain 重復增加,awaitDrain 不能清零,Readable 卡住的情況 // 詳情見 https://github.com/nodejs/node/issues/7278 var increasedAwaitDrain = false; function ondata(chunk) {  debug('ondata');  increasedAwaitDrain = false;  var ret = dest.write(chunk);  if (false === ret && !increasedAwaitDrain) {   // 防止在 dest.write() 內調用 src.unpipe(dest),導致 awaitDrain 不能清零,Readable 卡住的情況   if (((state.pipesCount === 1 && state.pipes === dest) ||      (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)     ) &&      !cleanedUp) {    debug('false write response, pause', src._readableState.awaitDrain);    src._readableState.awaitDrain++;    increasedAwaitDrain = true;   }   // 進入 pause 模式   src.pause();  } }

在 ondata(chunk) 函數內,通過 dest.write(chunk) 將數據寫入 Writable

此時,在 _write() 內部可能會調用 src.push(chunk) 或使其 unpipe,這會導致 awaitDrain 多次增加,不能清零,Readable 卡住

當不能再向 Writable 寫入數據時,Readable 會進入 pause 模式,直到所有的 drain 事件觸發

觸發 drain 事件,執行 ondrain()

// lib/_stream_readable.js var ondrain = pipeOnDrain(src); function pipeOnDrain(src) {  return function() {   var state = src._readableState;   debug('pipeOnDrain', state.awaitDrain);   if (state.awaitDrain)    state.awaitDrain--;   // awaitDrain === 0,且有 data 監聽器   if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {    state.flowing = true;    flow(src);   }  }; }

每個 drain 事件觸發時,都會減少 awaitDrain,直到 awaitDrain 為 0。此時,調用 flow(src),使 Readable 進入 flow 模式

到這里,整個數據傳遞循環已經建立,數據會順著循環源源不斷的流入 Writable,直到所有數據寫入完成

unpipe

不管寫入過程中是否出現錯誤,最后都會執行 unpipe()

// lib/_stream_readable.js// ... function unpipe() {  debug('unpipe');  src.unpipe(dest); }// ...Readable.prototype.unpipe = function(dest) { var state = this._readableState; var unpipeInfo = { hasUnpiped: false }; // 啥也沒有 if (state.pipesCount === 0)  return this; // 只有一個 if (state.pipesCount === 1) {  if (dest && dest !== state.pipes)   return this;  // 沒有指定就 unpipe 所有  if (!dest)   dest = state.pipes;  state.pipes = null;  state.pipesCount = 0;  state.flowing = false;  if (dest)   dest.emit('unpipe', this, unpipeInfo);  return this; } // 沒有指定就 unpipe 所有 if (!dest) {  var dests = state.pipes;  var len = state.pipesCount;  state.pipes = null;  state.pipesCount = 0;  state.flowing = false;  for (var i = 0; i < len; i++)   dests[i].emit('unpipe', this, unpipeInfo);  return this; } // 找到指定 Writable,并 unpipe var index = state.pipes.indexOf(dest); if (index === -1)  return this; state.pipes.splice(index, 1); state.pipesCount -= 1; if (state.pipesCount === 1)  state.pipes = state.pipes[0]; dest.emit('unpipe', this, unpipeInfo); return this;};

Readable.prototype.unpipe() 函數會根據 state.pipes 屬性和 dest 參數,選擇執行策略。最后會觸發 dest 的 unpipe 事件

unpipe 事件觸發后,調用 onunpipe(),清理相關數據

// lib/_stream_readable.js function onunpipe(readable, unpipeInfo) {  debug('onunpipe');  if (readable === src) {   if (unpipeInfo && unpipeInfo.hasUnpiped === false) {    unpipeInfo.hasUnpiped = true;    // 清理相關數據    cleanup();   }  } }

End

在整個 pipe 的過程中,Readable 是主動方 ( 負責整個 pipe 過程:包括數據傳遞、unpipe 與異常處理 ),Writable 是被動方 ( 只需要觸發 drain 事件 )

總結一下 pipe 的過程:

  • 首先執行 readbable.pipe(writable),將 readable 與 writable 對接上
  • 當 readable 中有數據時,readable.emit('data'),將數據寫入 writable
  • 如果 writable.write(chunk) 返回 false,則進入 pause 模式,等待 drain 事件觸發
  • drain 事件全部觸發后,再次進入 flow 模式,寫入數據
  • 不管數據寫入完成或發生中斷,最后都會調用 unpipe()
  • unpipe() 調用 Readable.prototype.unpipe(),觸發 dest 的 unpipe 事件,清理相關數據

參考:

https://github.com/nodejs/node/blob/master/lib/_stream_readable.js

https://github.com/nodejs/node/blob/master/lib/_stream_writable.js

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 华阴市| 色达县| 嘉义市| 克拉玛依市| 浦北县| 江阴市| 吉林市| 靖安县| 宜州市| 福清市| 荥经县| 阳曲县| 胶南市| 措勤县| 海安县| 定结县| 平乐县| 伊通| 曲阜市| 五华县| 泽州县| 米脂县| 乌拉特后旗| 万安县| 乌审旗| 衡阳县| 汉阴县| 铜陵市| 钟山县| 永春县| 全椒县| 西和县| 贵定县| 尚义县| 桑日县| 文成县| 大理市| 高淳县| 镇远县| 五峰| 丹巴县|