亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Node.js高級(jí)編程cluster環(huán)境及源碼調(diào)試詳解

 更新時(shí)間:2022年12月29日 16:02:10   作者:Aaaaaaaaaaayou  
這篇文章主要為大家介紹了Node.js高級(jí)編程cluster環(huán)境及源碼調(diào)試詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

日常工作中,對(duì) Node.js 的使用都比較粗淺,趁未羊之際,來(lái)學(xué)點(diǎn)稍微高級(jí)的,那就先從 cluster 開始吧。

尼古拉斯張三說(shuō)過(guò),“帶著問(wèn)題去學(xué)習(xí)是一個(gè)比較好的方法”,所以我們也來(lái)試一試。

當(dāng)初使用 cluster 時(shí),一直好奇它是怎么做到多個(gè)子進(jìn)程監(jiān)聽同一個(gè)端口而不沖突的,比如下面這段代碼:

const cluster = require('cluster')
const net = require('net')
const cpus = require('os').cpus()
if (cluster.isPrimary) {
  for (let i = 0; i < cpus.length; i++) {
    cluster.fork()
  }
} else {
  net
    .createServer(function (socket) {
      socket.on('data', function (data) {
        socket.write(`Reply from ${process.pid}: ` + data.toString())
      })
      socket.on('end', function () {
        console.log('Close')
      })
      socket.write('Hello!\n')
    })
    .listen(9999)
}

該段代碼通過(guò)父進(jìn)程 fork 出了多個(gè)子進(jìn)程,且這些子進(jìn)程都監(jiān)聽了 9999 這個(gè)端口并能正常提供服務(wù),這是如何做到的呢?我們來(lái)研究一下。

準(zhǔn)備調(diào)試環(huán)境

學(xué)習(xí) Node.js 官方提供庫(kù)最好的方式當(dāng)然是調(diào)試一下,所以,我們先來(lái)準(zhǔn)備一下環(huán)境。注:本文的操作系統(tǒng)為 macOS Big Sur 11.6.6,其他系統(tǒng)請(qǐng)自行準(zhǔn)備相應(yīng)環(huán)境。

編譯 Node.js

  • 下載 Node.js 源碼
git clone https://github.com/nodejs/node.git

然后在下面這兩個(gè)地方加入斷點(diǎn),方便后面調(diào)試用:

// lib/internal/cluster/primary.js
function queryServer(worker, message) {
  debugger;
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect) return;
  ...
}
// lib/internal/cluster/child.js
send(message, (reply, handle) => {
  debugger
  if (typeof obj._setServerData === 'function') obj._setServerData(reply.data)
  if (handle) {
    // Shared listen socket
    shared(reply, {handle, indexesKey, index}, cb)
  } else {
    // Round-robin.
    rr(reply, {indexesKey, index}, cb)
  }
})
  • 進(jìn)入目錄,執(zhí)行
./configure --debug
make -j4

之后會(huì)生成 out/Debug/node

準(zhǔn)備 IDE 環(huán)境

使用 vscode 調(diào)試,配置好 launch.json 就可以了(其他 IDE 類似,請(qǐng)自行解決):

{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "Debug C++",
      "type": "cppdbg",
      "program": "/Users/youxingzhi/ayou/node/out/Debug/node",
      "request": "launch",
      "args": ["/Users/youxingzhi/ayou/node/index.js"],
      "stopAtEntry": false,
      "cwd": "${workspaceFolder}",
      "environment": [],
      "externalConsole": false,
      "MIMode": "lldb"
    },
    {
      "name": "Debug Node",
      "type": "node",
      "runtimeExecutable": "/Users/youxingzhi/ayou/node/out/Debug/node",
      "request": "launch",
      "args": ["--expose-internals", "--nolazy"],
      "skipFiles": [],
      "program": "${workspaceFolder}/index.js"
    }
  ]
}

其中第一個(gè)是用于調(diào)式 C++ 代碼(需要安裝 C/C++ 插件),第二個(gè)用于調(diào)式 JS 代碼。接下來(lái)就可以開始調(diào)試了,我們暫時(shí)用調(diào)式 JS 代碼的那個(gè)配置就好了。

Cluster 源碼調(diào)試

準(zhǔn)備好調(diào)試代碼(為了調(diào)試而已,這里啟動(dòng)一個(gè)子進(jìn)程就夠了):

debugger
const cluster = require('cluster')
const net = require('net')
if (cluster.isPrimary) {
  debugger
  cluster.fork()
} else {
  const server = net.createServer(function (socket) {
    socket.on('data', function (data) {
      socket.write(`Reply from ${process.pid}: ` + data.toString())
    })
    socket.on('end', function () {
      console.log('Close')
    })
    socket.write('Hello!\n')
  })
  debugger
  server.listen(9999)
}

很明顯,我們的程序可以分父進(jìn)程和子進(jìn)程這兩部分來(lái)進(jìn)行分析。

首先進(jìn)入的是父進(jìn)程:

執(zhí)行 require('cluster') 時(shí),會(huì)進(jìn)入 lib/cluster.js 這個(gè)文件:

const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'primary'
module.exports = require(`internal/cluster/${childOrPrimary}`)

會(huì)根據(jù)當(dāng)前 process.env 上是否有 NODE_UNIQUE_ID 來(lái)引入不同的模塊,此時(shí)是沒有的,所以會(huì)引入 internal/cluster/primary.js 這個(gè)模塊:

...
const cluster = new EventEmitter();
...
module.exports = cluster
const handles = new SafeMap()
cluster.isWorker = false
cluster.isMaster = true // Deprecated alias. Must be same as isPrimary.
cluster.isPrimary = true
cluster.Worker = Worker
cluster.workers = {}
cluster.settings = {}
cluster.SCHED_NONE = SCHED_NONE // Leave it to the operating system.
cluster.SCHED_RR = SCHED_RR // Primary distributes connections.
...
cluster.schedulingPolicy = schedulingPolicy
cluster.setupPrimary = function (options) {
...
}
// Deprecated alias must be same as setupPrimary
cluster.setupMaster = cluster.setupPrimary
function setupSettingsNT(settings) {
...
}
function createWorkerProcess(id, env) {
  ...
}
function removeWorker(worker) {
 ...
}
function removeHandlesForWorker(worker) {
 ...
}
cluster.fork = function (env) {
  ...
}

該模塊主要是在 cluster 對(duì)象上掛載了一些屬性和方法,并導(dǎo)出,這些后面回過(guò)頭再看,我們繼續(xù)往下調(diào)試。往下調(diào)試會(huì)進(jìn)入 if (cluster.isPrimary) 分支,代碼很簡(jiǎn)單,僅僅是 fork 出了一個(gè)新的子進(jìn)程而已:

// lib/internal/cluster/primary.js
cluster.fork = function (env) {
  cluster.setupPrimary()
  const id = ++ids
  const workerProcess = createWorkerProcess(id, env)
  const worker = new Worker({
    id: id,
    process: workerProcess,
  })
  ...
  worker.process.on('internalMessage', internal(worker, onmessage))
  process.nextTick(emitForkNT, worker)
  cluster.workers[worker.id] = worker
  return worker
}

cluster.setupPrimary():比較簡(jiǎn)單,初始化一些參數(shù)啥的。

createWorkerProcess(id, env)

// lib/internal/cluster/primary.js
function createWorkerProcess(id, env) {
  const workerEnv = {...process.env, ...env, NODE_UNIQUE_ID: `${id}`}
  const execArgv = [...cluster.settings.execArgv]
  ...
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid,
  })
}

可以看到,該方法主要是通過(guò) fork 啟動(dòng)了一個(gè)子進(jìn)程來(lái)執(zhí)行我們的 index.js,且啟動(dòng)子進(jìn)程的時(shí)候設(shè)置了環(huán)境變量 NODE_UNIQUE_ID,這樣 index.jsrequire('cluster') 的時(shí)候,引入的就是 internal/cluster/child.js 模塊了。

worker.process.on('internalMessage', internal(worker, onmessage)):監(jiān)聽子進(jìn)程傳遞過(guò)來(lái)的消息并處理。

接下來(lái)就進(jìn)入了子進(jìn)程的邏輯:

前面說(shuō)了,此時(shí)引入的是 internal/cluster/child.js 模塊,我們先跳過(guò),繼續(xù)往下,執(zhí)行 server.listen(9999) 時(shí)實(shí)際上是調(diào)用了 Server 上的方法:

// lib/net.js
Server.prototype.listen = function (...args) {
  ...
      listenInCluster(
        this,
        null,
        options.port | 0,
        4,
        backlog,
        undefined,
        options.exclusive
      );
}

可以看到,最終是調(diào)用了 listenInCluster

// lib/net.js
function listenInCluster(
  server,
  address,
  port,
  addressType,
  backlog,
  fd,
  exclusive,
  flags,
  options
) {
  exclusive = !!exclusive
  if (cluster === undefined) cluster = require('cluster')
  if (cluster.isPrimary || exclusive) {
    // Will create a new handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags)
    return
  }
  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
    backlog,
    ...options,
  }
  // Get the primary's server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnPrimaryHandle)
  function listenOnPrimaryHandle(err, handle) {
    err = checkBindError(err, port, handle)
    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', address, port)
      return server.emit('error', ex)
    }
    // Reuse primary's server handle
    server._handle = handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags)
  }
}

由于是在子進(jìn)程中執(zhí)行,所以最后會(huì)調(diào)用 cluster._getServer(server, serverQuery, listenOnPrimaryHandle)

// lib/internal/cluster/child.js
// 這里的 cb 就是上面的 listenOnPrimaryHandle
cluster._getServer = function (obj, options, cb) {
  ...
  send(message, (reply, handle) => {
    debugger
    if (typeof obj._setServerData === 'function') obj._setServerData(reply.data)
    if (handle) {
      // Shared listen socket
      shared(reply, {handle, indexesKey, index}, cb)
    } else {
      // Round-robin.
      rr(reply, {indexesKey, index}, cb)
    }
  })
  ...
}

該函數(shù)最終會(huì)向父進(jìn)程發(fā)送 queryServer 的消息,父進(jìn)程處理完后會(huì)調(diào)用回調(diào)函數(shù),回調(diào)函數(shù)中會(huì)調(diào)用 cblistenOnPrimaryHandle??磥?lái),listen 的邏輯是在父進(jìn)程中進(jìn)行的了。

接下來(lái)進(jìn)入父進(jìn)程:

父進(jìn)程收到 queryServer 的消息后,最終會(huì)調(diào)用 queryServer 這個(gè)方法:

// lib/internal/cluster/primary.js
function queryServer(worker, message) {
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect) return
  const key =
    `${message.address}:${message.port}:${message.addressType}:` +
    `${message.fd}:${message.index}`
  let handle = handles.get(key)
  if (handle === undefined) {
    let address = message.address
    // Find shortest path for unix sockets because of the ~100 byte limit
    if (
      message.port < 0 &&
      typeof address === 'string' &&
      process.platform !== 'win32'
    ) {
      address = path.relative(process.cwd(), address)
      if (message.address.length < address.length) address = message.address
    }
    // UDP is exempt from round-robin connection balancing for what should
    // be obvious reasons: it's connectionless. There is nothing to send to
    // the workers except raw datagrams and that's pointless.
    if (
      schedulingPolicy !== SCHED_RR ||
      message.addressType === 'udp4' ||
      message.addressType === 'udp6'
    ) {
      handle = new SharedHandle(key, address, message)
    } else {
      handle = new RoundRobinHandle(key, address, message)
    }
    handles.set(key, handle)
  }
  ...
}

可以看到,這里主要是對(duì) handle 的處理,這里的 handle 指的是調(diào)度策略,分為 SharedHandleRoundRobinHandle,分別對(duì)應(yīng)搶占式和輪詢兩種策略(文章最后補(bǔ)充部分有關(guān)于兩者對(duì)比的例子)。

Node.js 中默認(rèn)是 RoundRobinHandle 策略,可通過(guò)環(huán)境變量 NODE_CLUSTER_SCHED_POLICY 來(lái)修改,取值可以為 noneSharedHandle) 或 rrRoundRobinHandle)。

SharedHandle

首先,我們來(lái)看一下 SharedHandle,由于我們這里是 TCP 協(xié)議,所以最后會(huì)通過(guò) net._createServerHandle 創(chuàng)建一個(gè) TCP 對(duì)象掛載在 handle 屬性上(注意這里又有一個(gè) handle,別搞混了):

// lib/internal/cluster/shared_handle.js
function SharedHandle(key, address, {port, addressType, fd, flags}) {
  this.key = key
  this.workers = new SafeMap()
  this.handle = null
  this.errno = 0
  let rval
  if (addressType === 'udp4' || addressType === 'udp6')
    rval = dgram._createSocketHandle(address, port, addressType, fd, flags)
  else rval = net._createServerHandle(address, port, addressType, fd, flags)
  if (typeof rval === 'number') this.errno = rval
  else this.handle = rval
}

createServerHandle 中除了創(chuàng)建 TCP 對(duì)象外,還綁定了端口和地址:

// lib/net.js
function createServerHandle(address, port, addressType, fd, flags) {
  ...
  } else {
    handle = new TCP(TCPConstants.SERVER);
    isTCP = true;
  }
  if (address || port || isTCP) {
      ...
      err = handle.bind6(address, port, flags);
    } else {
      err = handle.bind(address, port);
    }
  }
  ...
  return handle;
}

然后,queryServer 中繼續(xù)執(zhí)行,會(huì)調(diào)用 add 方法,最終會(huì)將 handle 也就是 TCP 對(duì)象傳遞給子進(jìn)程:

// lib/internal/cluster/primary.js
function queryServer(worker, message) {
  ...
  if (!handle.data) handle.data = message.data
  // Set custom server data
  handle.add(worker, (errno, reply, handle) => {
    const {data} = handles.get(key)
    if (errno) handles.delete(key) // Gives other workers a chance to retry.
    send(
      worker,
      {
        errno,
        key,
        ack: message.seq,
        data,
        ...reply,
      },
      handle // TCP 對(duì)象
    )
  })
  ...
}

之后進(jìn)入子進(jìn)程:

子進(jìn)程收到父進(jìn)程對(duì)于 queryServer 的回復(fù)后,會(huì)調(diào)用 shared

// lib/internal/cluster/child.js
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function (obj, options, cb) {
  ...
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function') obj._setServerData(reply.data)
    if (handle) {
      // Shared listen socket
      shared(reply, {handle, indexesKey, index}, cb)
    } else {
      // Round-robin.
      rr(reply, {indexesKey, index}, cb) // cb 是 listenOnPrimaryHandle
    }
  })
  ...
}

shared 中最后會(huì)調(diào)用 cb 也就是 listenOnPrimaryHandle

// lib/net.js
function listenOnPrimaryHandle(err, handle) {
  err = checkBindError(err, port, handle)
  if (err) {
    const ex = exceptionWithHostPort(err, 'bind', address, port)
    return server.emit('error', ex)
  }
  // Reuse primary's server handle 這里的 server 是 index.js 中 net.createServer 返回的那個(gè)對(duì)象
  server._handle = handle
  // _listen2 sets up the listened handle, it is still named like this
  // to avoid breaking code that wraps this method
  server._listen2(address, port, addressType, backlog, fd, flags)
}

這里會(huì)把 handle 賦值給 server._handle,這里的 serverindex.jsnet.createServer 返回的那個(gè)對(duì)象,并調(diào)用 server._listen2,也就是 setupListenHandle

// lib/net.js
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug('setupListenHandle', address, port, addressType, backlog, fd)
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don't need to do this.
  if (this._handle) {
    debug('setupListenHandle: have a handle already')
  } else {
    ...
  }
  this[async_id_symbol] = getNewAsyncId(this._handle)
  this._handle.onconnection = onconnection
  this._handle[owner_symbol] = this
  // Use a backlog of 512 entries. We pass 511 to the listen() call because
  // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
  // which will thus give us a backlog of 512 entries.
  const err = this._handle.listen(backlog || 511)
  if (err) {
    const ex = uvExceptionWithHostPort(err, 'listen', address, port)
    this._handle.close()
    this._handle = null
    defaultTriggerAsyncIdScope(
      this[async_id_symbol],
      process.nextTick,
      emitErrorNT,
      this,
      ex
    )
    return
  }
}

首先會(huì)執(zhí)行 this._handle.onconnection = onconnection,由于客戶端請(qǐng)求過(guò)來(lái)時(shí)會(huì)調(diào)用 this._handle(也就是 TCP 對(duì)象)上的 onconnection 方法,也就是會(huì)執(zhí)行 lib/net.js 中的 onconnection 方法建立連接,之后就可以通信了。為了控制篇幅,該方法就不繼續(xù)往下了。

然后調(diào)用 listen 監(jiān)聽,注意這里參數(shù) backlog 跟之前不同,不是表示端口,而是表示在拒絕連接之前,操作系統(tǒng)可以掛起的最大連接數(shù)量,也就是連接請(qǐng)求的排隊(duì)數(shù)量。我們平時(shí)遇到的 listen EADDRINUSE: address already in use 錯(cuò)誤就是因?yàn)檫@行代碼返回了非 0 的錯(cuò)誤。

如果還有其他子進(jìn)程,也會(huì)同樣走一遍上述的步驟,不同之處是在主進(jìn)程中 queryServer 時(shí),由于已經(jīng)有 handle 了,不需要再重新創(chuàng)建了:

function queryServer(worker, message) {
  debugger;
  // Stop processing if worker already disconnecting
  if (worker.exitedAfterDisconnect) return;
  const key =
    `${message.address}:${message.port}:${message.addressType}:` +
    `${message.fd}:${message.index}`;
  let handle = handles.get(key);
  ...
}

以上內(nèi)容整理成流程圖如下:

所謂的 SharedHandle,其實(shí)是在多個(gè)子進(jìn)程中共享 TCP 對(duì)象的句柄,當(dāng)客戶端請(qǐng)求過(guò)來(lái)時(shí),多個(gè)進(jìn)程會(huì)去競(jìng)爭(zhēng)該請(qǐng)求的處理權(quán),會(huì)導(dǎo)致任務(wù)分配不均的問(wèn)題,這也是為什么需要 RoundRobinHandle 的原因。接下來(lái)繼續(xù)看看這種調(diào)度方式。

RoundRobinHandle

// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(
  key,
  address,
  {port, fd, flags, backlog, readableAll, writableAll}
) {
  ...
  this.server = net.createServer(assert.fail)
  ...
  else if (port >= 0) {
    this.server.listen({
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
      backlog,
    })
  }
  ...
  this.server.once('listening', () => {
    this.handle = this.server._handle
    this.handle.onconnection = (err, handle) => {
      this.distribute(err, handle)
    }
    this.server._handle = null
    this.server = null
  })
}

如上所示,RoundRobinHandle 會(huì)調(diào)用 net.createServer() 創(chuàng)建一個(gè) server,然后調(diào)用 listen 方法,最終會(huì)來(lái)到 setupListenHandle

// lib/net.js
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  debug('setupListenHandle', address, port, addressType, backlog, fd)
  // If there is not yet a handle, we need to create one and bind.
  // In the case of a server sent via IPC, we don't need to do this.
  if (this._handle) {
    debug('setupListenHandle: have a handle already')
  } else {
    debug('setupListenHandle: create a handle')
    let rval = null
    // Try to bind to the unspecified IPv6 address, see if IPv6 is available
    if (!address && typeof fd !== 'number') {
      rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags)
      if (typeof rval === 'number') {
        rval = null
        address = DEFAULT_IPV4_ADDR
        addressType = 4
      } else {
        address = DEFAULT_IPV6_ADDR
        addressType = 6
      }
    }
    if (rval === null)
      rval = createServerHandle(address, port, addressType, fd, flags)
    if (typeof rval === 'number') {
      const error = uvExceptionWithHostPort(rval, 'listen', address, port)
      process.nextTick(emitErrorNT, this, error)
      return
    }
    this._handle = rval
  }
  this[async_id_symbol] = getNewAsyncId(this._handle)
  this._handle.onconnection = onconnection
  this._handle[owner_symbol] = this
  ...
}

且由于此時(shí) this._handle 為空,會(huì)調(diào)用 createServerHandle() 生成一個(gè) TCP 對(duì)象作為 _handle。之后就跟 SharedHandle 一樣了,最后也會(huì)回到子進(jìn)程:

// lib/internal/cluster/child.js
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function (obj, options, cb) {
  ...
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function') obj._setServerData(reply.data)
    if (handle) {
      // Shared listen socket
      shared(reply, {handle, indexesKey, index}, cb)
    } else {
      // Round-robin.
      rr(reply, {indexesKey, index}, cb) // cb 是 listenOnPrimaryHandle
    }
  })
  ...
}

不過(guò)由于 RoundRobinHandle 不會(huì)傳遞 handle 給子進(jìn)程,所以此時(shí)會(huì)執(zhí)行 rr

function rr(message, {indexesKey, index}, cb) {
  ...
  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
  // with it. Fools net.Server into thinking that it's backed by a real
  // handle. Use a noop function for ref() and unref() because the control
  // channel is going to keep the worker alive anyway.
  const handle = {close, listen, ref: noop, unref: noop}
  if (message.sockname) {
    handle.getsockname = getsockname // TCP handles only.
  }
  assert(handles.has(key) === false)
  handles.set(key, handle)
  debugger
  cb(0, handle)
}

可以看到,這里構(gòu)造了一個(gè)假的 handle,然后執(zhí)行 cb 也就是 listenOnPrimaryHandle。最終跟 SharedHandle 一樣會(huì)調(diào)用 setupListenHandle 執(zhí)行 this._handle.onconnection = onconnection。

RoundRobinHandle 邏輯到此就結(jié)束了,好像缺了點(diǎn)什么的樣子?;仡櫹?,我們給每個(gè)子進(jìn)程中的 server 上都掛載了一個(gè)假的 handle,但它跟綁定了端口的 TCP 對(duì)象沒有任何關(guān)系,如果客戶端請(qǐng)求過(guò)來(lái)了,是不會(huì)執(zhí)行它上面的 onconnection 方法的。之所以要這樣寫,估計(jì)是為了保持跟之前 SharedHandle 代碼邏輯的統(tǒng)一。

此時(shí),我們需要回到 RoundRobinHandle,有這樣一段代碼:

// lib/internal/cluster/round_robin_handle.js
this.server.once('listening', () => {
  this.handle = this.server._handle
  this.handle.onconnection = (err, handle) => {
    this.distribute(err, handle)
  }
  this.server._handle = null
  this.server = null
})

listen 執(zhí)行完后,會(huì)觸發(fā) listening 事件的回調(diào),這里重寫了 handle 上面的 onconnection

所以,當(dāng)客戶端請(qǐng)求過(guò)來(lái)時(shí),會(huì)調(diào)用 distribute 在多個(gè)子進(jìn)程中輪詢分發(fā),這里又有一個(gè) handle,這里的 handle 姑且理解為 clientHandle,即客戶端連接的 handle,別搞混了??傊?,最后會(huì)將這個(gè) clientHandle 發(fā)送給子進(jìn)程:

// lib/internal/cluster/round_robin_handle.js
RoundRobinHandle.prototype.handoff = function (worker) {
  ...
  const message = { act: 'newconn', key: this.key };
  // 這里的 handle 是 clientHandle
  sendHelper(worker.process, message, handle, (reply) => {
    if (reply.accepted) handle.close();
    else this.distribute(0, handle); // Worker is shutting down. Send to another.
    this.handoff(worker);
  });
};

而子進(jìn)程在 require('cluster') 時(shí),已經(jīng)監(jiān)聽了該事件:

// lib/internal/cluster/child.js
process.on('internalMessage', internal(worker, onmessage))
send({act: 'online'})
function onmessage(message, handle) {
  if (message.act === 'newconn') onconnection(message, handle)
  else if (message.act === 'disconnect')
    ReflectApply(_disconnect, worker, [true])
}

最終也同樣會(huì)走到 net.js 中的 function onconnection(err, clientHandle) 方法。這個(gè)方法第二個(gè)參數(shù)名就叫 clientHandle,這也是為什么前面的 handle 我想叫這個(gè)名字的原因。

還是用圖來(lái)總結(jié)下:

SharedHandle 不同的是,該調(diào)度策略中 onconnection 最開始是在主進(jìn)程中觸發(fā)的,然后通過(guò)輪詢算法挑選一個(gè)子進(jìn)程,將 clientHandle 傳遞給它。

為什么端口不沖突

cluster 模塊的調(diào)試就到此告一段落了,接下來(lái)我們來(lái)回答一下一開始的問(wèn)題,為什么多個(gè)進(jìn)程監(jiān)聽同一個(gè)端口沒有報(bào)錯(cuò)?

網(wǎng)上有些文章說(shuō)是因?yàn)樵O(shè)置了 SO_REUSEADDR,但其實(shí)跟這個(gè)沒關(guān)系。通過(guò)上面的分析知道,不管什么調(diào)度策略,最終都只會(huì)在主進(jìn)程中對(duì) TCP 對(duì)象 bind 一次。

我們可以修改一下源代碼來(lái)測(cè)試一下:

// deps/uv/src/unix/tcp.c 下面的 SO_REUSEADDR 改成 SO_DEBUG
if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))

編譯后執(zhí)行發(fā)現(xiàn),我們?nèi)匀豢梢哉J褂?cluster 模塊。

那這個(gè) SO_REUSEADDR 到底影響的是啥呢?我們繼續(xù)來(lái)研究一下。

SO_REUSEADDR

首先,我們我們知道,下面的代碼是會(huì)報(bào)錯(cuò)的:

const net = require('net')
const server1 = net.createServer()
const server2 = net.createServer()
server1.listen(9999)
server2.listen(9999)

但是,如果我稍微修改一下,就不會(huì)報(bào)錯(cuò)了:

const net = require('net')
const server1 = net.createServer()
const server2 = net.createServer()
server1.listen(9999, '127.0.0.1')
server2.listen(9999, '10.53.48.67')

原因在于 listen 時(shí),如果不指定 address,則相當(dāng)于綁定了所有地址,當(dāng)兩個(gè) server 都這樣做時(shí),請(qǐng)求到來(lái)就不知道要給誰(shuí)處理了。

我們可以類比成找對(duì)象,port 是對(duì)外貌的要求,address 是對(duì)城市的要求?,F(xiàn)在甲乙都想要一個(gè) port 是 1米7以上 不限城市的對(duì)象,那如果有一個(gè) 1米7以上 來(lái)自 深圳 的對(duì)象,就不知道介紹給誰(shuí)了。而如果兩者都指定了城市就好辦多了。

那如果一個(gè)指定了 address,一個(gè)沒有呢?就像下面這樣:

const net = require('net')
const server1 = net.createServer()
const server2 = net.createServer()
server1.listen(9999, '127.0.0.1')
server2.listen(9999)

結(jié)果是:設(shè)置了 SO_REUSEADDR 可以正常運(yùn)行,而修改成 SO_DEBUG 的會(huì)報(bào)錯(cuò)。

還是上面的例子,甲對(duì)城市沒有限制,乙需要是來(lái)自 深圳 的,那當(dāng)一個(gè)對(duì)象來(lái)自 深圳,我們可以選擇優(yōu)先介紹給乙,非 深圳 的就選擇介紹給甲,這個(gè)就是 SO_REUSEADDR 的作用。

補(bǔ)充

SharedHandle 和 RoundRobinHandle 兩種模式的對(duì)比

先準(zhǔn)備下測(cè)試代碼:

// cluster.js
const cluster = require('cluster')
const net = require('net')
if (cluster.isMaster) {
  for (let i = 0; i < 4; i++) {
    cluster.fork()
  }
} else {
  const server = net.createServer()
  server.on('connection', (socket) => {
    console.log(`PID: ${process.pid}!`)
  })
  server.listen(9997)
}
// client.js
const net = require('net')
for (let i = 0; i < 20; i++) {
  net.connect({port: 9997})
}

RoundRobin 先執(zhí)行 node cluster.js,然后執(zhí)行 node client.js,會(huì)看到如下輸出,可以看到?jīng)]有任何一個(gè)進(jìn)程的 PID 是緊挨著的。至于為什么沒有一直按照一樣的順序,后面再研究一下。

PID: 42904!
PID: 42906!
PID: 42905!
PID: 42904!
PID: 42907!
PID: 42905!
PID: 42906!
PID: 42907!
PID: 42904!
PID: 42905!
PID: 42906!
PID: 42907!
PID: 42904!
PID: 42905!
PID: 42906!
PID: 42907!
PID: 42904!
PID: 42905!
PID: 42906!
PID: 42904!

Shared

先執(zhí)行 NODE_CLUSTER_SCHED_POLICY=none node cluster.js,則 Node.js 會(huì)使用 SharedHandle,然后執(zhí)行 node client.js,會(huì)看到如下輸出,可以看到同一個(gè) PID 連續(xù)輸出了多次,所以這種策略會(huì)導(dǎo)致進(jìn)程任務(wù)分配不均的現(xiàn)象。就像公司里有些人忙到 996,有些人天天摸魚,這顯然不是老板愿意看到的現(xiàn)象,所以不推薦使用。

PID: 42561!
PID: 42562!
PID: 42561!
PID: 42562!
PID: 42564!
PID: 42561!
PID: 42562!
PID: 42563!
PID: 42561!
PID: 42562!
PID: 42563!
PID: 42564!
PID: 42564!
PID: 42564!
PID: 42564!
PID: 42564!
PID: 42563!
PID: 42563!
PID: 42564!
PID: 42563!

以上就是Node.js高級(jí)編程cluster環(huán)境及源碼調(diào)試詳解的詳細(xì)內(nèi)容,更多關(guān)于Node.js高級(jí)編程cluster的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Node.js中的文件系統(tǒng)(file system)模塊詳解

    Node.js中的文件系統(tǒng)(file system)模塊詳解

    Node.js文件系統(tǒng)模塊提供了豐富的方法,用于讀取、寫入、操作文件和目錄,文件系統(tǒng)模塊是Node.js強(qiáng)大而靈活的一部分,為文件操作提供了方便的API,本文給大家介紹Node.js中的文件系統(tǒng)(file system)模塊,感興趣的朋友一起看看吧
    2023-11-11
  • node.js中cluster的使用教程

    node.js中cluster的使用教程

    這篇文章主要介紹了node.js中cluster的使用教程,分別介紹使用NODE中cluster利用多核CPU、通過(guò)消息傳遞來(lái)監(jiān)控工作進(jìn)程狀態(tài)以及終止進(jìn)程等功能,給出了詳細(xì)的示例代碼供大家參考學(xué)習(xí),需要的朋友們下面來(lái)一起看看吧
    2017-06-06
  • npm與nrm兩種方式查看源和切換鏡像詳解

    npm與nrm兩種方式查看源和切換鏡像詳解

    nrm(npm registry manager )是npm的鏡像源管理工具,它可以快速在讓你在本地源之間切換,下面這篇文章主要給大家介紹了關(guān)于npm與nrm兩種方式查看源和切換鏡像的相關(guān)資料,需要的朋友可以參考下
    2023-02-02
  • Nodejs開發(fā)grpc的實(shí)例代碼

    Nodejs開發(fā)grpc的實(shí)例代碼

    Nodejs開發(fā)grpc包含靜態(tài)和動(dòng)態(tài)兩種代碼生成方式,靜態(tài)代碼生成需要提前通過(guò).proto文件編譯生成JS源碼,而動(dòng)態(tài)代碼生成則是在運(yùn)行時(shí)指定IDL文件位置,實(shí)時(shí)生成源碼,兩者各有優(yōu)缺點(diǎn),本文給大家介紹Nodejs開發(fā)grpc的實(shí)例代碼,感興趣的朋友一起看看吧
    2024-10-10
  • node.js入門教程之querystring模塊的使用方法

    node.js入門教程之querystring模塊的使用方法

    querystring模塊主要用來(lái)解析查詢字符串,下面這篇文章主要介紹了關(guān)于node.js中querystring模塊使用方法的相關(guān)資料,需要的朋友可以參考借鑒,下面來(lái)一起看看吧。
    2017-02-02
  • node.js學(xué)習(xí)之base64編碼解碼

    node.js學(xué)習(xí)之base64編碼解碼

    開發(fā)者對(duì)Base64編碼肯定很熟悉,是否對(duì)它有很清晰的認(rèn)識(shí)就不一定了。實(shí)際上Base64已經(jīng)簡(jiǎn)單到不能再簡(jiǎn)單了,這篇文章給大家通過(guò)示例代碼介紹了node.js對(duì)字符串和圖片base64編碼解碼的方法,有需要的朋友們可以通過(guò)本文來(lái)進(jìn)行學(xué)習(xí),下面來(lái)一起看看吧。
    2016-10-10
  • Node.js爬蟲如何獲取天氣和每日問(wèn)候詳解

    Node.js爬蟲如何獲取天氣和每日問(wèn)候詳解

    這篇文章主要給大家介紹了關(guān)于Node.js爬蟲如何獲取天氣和每日問(wèn)候的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Node.js爬蟲具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • 詳解express使用vue-router的history踩坑

    詳解express使用vue-router的history踩坑

    這篇文章主要介紹了express 使用 vue-router 的 history 踩坑,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-06-06
  • node.js連接mysql與基本用法示例

    node.js連接mysql與基本用法示例

    這篇文章主要介紹了node.js連接mysql與基本用法,結(jié)合實(shí)例形式分析了nodejs中mysql模塊的安裝、引入、創(chuàng)建連接、sql語(yǔ)句執(zhí)行等相關(guān)操作技巧,需要的朋友可以參考下
    2019-01-01
  • 利用n 升級(jí)工具升級(jí)Node.js版本及在mac環(huán)境下的坑

    利用n 升級(jí)工具升級(jí)Node.js版本及在mac環(huán)境下的坑

    這篇文章主要介紹了利用n 升級(jí)工具升級(jí)Node.js的方法,以及通過(guò)網(wǎng)友的測(cè)試發(fā)現(xiàn)在mac環(huán)境下利用n工具升級(jí)不成功導(dǎo)致node.js不可用的解決方法,有需要的朋友可以參考借鑒,下面來(lái)一起看看吧。
    2017-02-02

最新評(píng)論