Nodejs?Socket連接池及TCP?HTTP網(wǎng)絡(luò)模型詳解
前言
作為一名開發(fā)人員我們經(jīng)常會(huì)聽到HTTP協(xié)議、TCP/IP協(xié)議、UDP協(xié)議、Socket、Socket長連接、Socket連接池
等字眼,然而它們之間的關(guān)系、區(qū)別及原理并不是所有人都能理解清楚,這篇文章就從網(wǎng)絡(luò)協(xié)議基礎(chǔ)開始到Socket連接池,一步一步解釋他們之間的關(guān)系。
七層網(wǎng)絡(luò)模型
首先從網(wǎng)絡(luò)通信的分層模型講起:七層模型,亦稱OSI(Open System Interconnection)
模型。自下往上分為:物理層、數(shù)據(jù)鏈路層、網(wǎng)絡(luò)層、傳輸層、會(huì)話層、表示層和應(yīng)用層。所有有關(guān)通信的都離不開它,下面這張圖片介紹了各層所對(duì)應(yīng)的一些協(xié)議和硬件
通過上圖,我知道IP協(xié)議對(duì)應(yīng)于網(wǎng)絡(luò)層,TCP、UDP協(xié)議對(duì)應(yīng)于傳輸層,而HTTP協(xié)議對(duì)應(yīng)于應(yīng)用層,OSI并沒有Socket,那什么是Socket,后面我們將結(jié)合代碼具體詳細(xì)介紹。
TCP和UDP連接
關(guān)于傳輸層TCP、UDP協(xié)議可能我們平時(shí)遇見的會(huì)比較多,有人說TCP是安全的,UDP是不安全的,UDP傳輸比TCP快,那為什么呢,我們先從TCP的連接建立的過程開始分析,然后解釋UDP和TCP的區(qū)別。
TCP的三次握手和四次分手
我們知道TCP建立連接需要經(jīng)過三次握手,而斷開連接需要經(jīng)過四次分手,那三次握手和四次分手分別做了什么和如何進(jìn)行的。
第一次握手:建立連接??蛻舳税l(fā)送連接請(qǐng)求報(bào)文段,將SYN位置為1,Sequence Number為x;然后,客戶端進(jìn)入SYN_SEND
狀態(tài),等待服務(wù)器的確認(rèn);
第二次握手:服務(wù)器收到客戶端的SYN報(bào)文段,需要對(duì)這個(gè)SYN報(bào)文段進(jìn)行確認(rèn),設(shè)置Acknowledgment Number為x+1(Sequence Number+1);同時(shí),自己自己還要發(fā)送SYN請(qǐng)求信息,將SYN位置為1,Sequence Number為y;服務(wù)器端將上述所有信息放到一個(gè)報(bào)文段(即SYN+ACK報(bào)文段)中,一并發(fā)送給客戶端,此時(shí)服務(wù)器進(jìn)入SYN_RECV
狀態(tài);
第三次握手:客戶端收到服務(wù)器的SYN+ACK
報(bào)文段。然后將Acknowledgment Number設(shè)置為y+1,向服務(wù)器發(fā)送ACK報(bào)文段,這個(gè)報(bào)文段發(fā)送完畢以后,客戶端和服務(wù)器端都進(jìn)入ESTABLISHED
狀態(tài),完成TCP三次握手。
完成了三次握手,客戶端和服務(wù)器端就可以開始傳送數(shù)據(jù)。以上就是TCP三次握手的總體介紹。通信結(jié)束客戶端和服務(wù)端就斷開連接,需要經(jīng)過四次分手確認(rèn)。
第一次分手:主機(jī)1(可以使客戶端,也可以是服務(wù)器端),設(shè)置Sequence Number和Acknowledgment Number,向主機(jī)2發(fā)送一個(gè)FIN報(bào)文段;此時(shí),主機(jī)1進(jìn)入FIN_WAIT_1
狀態(tài);這表示主機(jī)1沒有數(shù)據(jù)要發(fā)送給主機(jī)2了;
第二次分手:主機(jī)2收到了主機(jī)1發(fā)送的FIN報(bào)文段,向主機(jī)1回一個(gè)ACK報(bào)文段,Acknowledgment Number為Sequence Number加1;主機(jī)1進(jìn)入FIN_WAIT_2
狀態(tài);主機(jī)2告訴主機(jī)1,我“同意”你的關(guān)閉請(qǐng)求;
第三次分手:主機(jī)2向主機(jī)1發(fā)送FIN報(bào)文段,請(qǐng)求關(guān)閉連接,同時(shí)主機(jī)2進(jìn)入LAST_ACK
狀態(tài);
第四次分手:主機(jī)1收到主機(jī)2發(fā)送的FIN報(bào)文段,向主機(jī)2發(fā)送ACK報(bào)文段,然后主機(jī)1進(jìn)入TIME_WAIT
狀態(tài);主機(jī)2收到主機(jī)1的ACK報(bào)文段以后,就關(guān)閉連接;此時(shí),主機(jī)1等待2MSL后依然沒有收到回復(fù),則證明Server端已正常關(guān)閉,那好,主機(jī)1也可以關(guān)閉連接了。
可以看到一次tcp請(qǐng)求的建立及關(guān)閉至少進(jìn)行7次通信,這還不包過數(shù)據(jù)的通信,而UDP不需3次握手和4次分手。
TCP和UDP的區(qū)別
1、TCP是面向鏈接的,雖然說網(wǎng)絡(luò)的不安全不穩(wěn)定特性決定了多少次握手都不能保證連接的可靠性,但TCP的三次握手在最低限度上(實(shí)際上也很大程度上保證了)保證了連接的可靠性;而UDP不是面向連接的,UDP傳送數(shù)據(jù)前并不與對(duì)方建立連接,對(duì)接收到的數(shù)據(jù)也不發(fā)送確認(rèn)信號(hào),發(fā)送端不知道數(shù)據(jù)是否會(huì)正確接收,當(dāng)然也不用重發(fā),所以說UDP是無連接的、不可靠的一種數(shù)據(jù)傳輸協(xié)議?!?/p>
2、也正由于1所說的特點(diǎn),使得UDP的開銷更小數(shù)據(jù)傳輸速率更高,因?yàn)椴槐剡M(jìn)行收發(fā)數(shù)據(jù)的確認(rèn),所以UDP的實(shí)時(shí)性更好。知道了TCP和UDP的區(qū)別,就不難理解為何采用TCP傳輸協(xié)議的MSN比采用UDP的QQ傳輸文件慢了,但并不能說QQ的通信是不安全的,因?yàn)槌绦騿T可以手動(dòng)對(duì)UDP的數(shù)據(jù)收發(fā)進(jìn)行驗(yàn)證,比如發(fā)送方對(duì)每個(gè)數(shù)據(jù)包進(jìn)行編號(hào)然后由接收方進(jìn)行驗(yàn)證啊什么的,即使是這樣,UDP因?yàn)樵诘讓訁f(xié)議的封裝上沒有采用類似TCP的“三次握手”而實(shí)現(xiàn)了TCP所無法達(dá)到的傳輸效率。
問題
關(guān)于傳輸層我們會(huì)經(jīng)常聽到一些問題
1.TCP服務(wù)器最大并發(fā)連接數(shù)是多少?
關(guān)于TCP服務(wù)器最大并發(fā)連接數(shù)有一種誤解就是“因?yàn)槎丝谔?hào)上限為65535,所以TCP服務(wù)器理論上的可承載的最大并發(fā)連接數(shù)也是65535”。
首先需要理解一條TCP連接的組成部分:客戶端IP、客戶端端口、服務(wù)端IP、服務(wù)端端口。
所以對(duì)于TCP服務(wù)端進(jìn)程來說,他可以同時(shí)連接的客戶端數(shù)量并不受限于可用端口號(hào),理論上一個(gè)服務(wù)器的一個(gè)端口能建立的連接數(shù)是全球的IP數(shù)*每臺(tái)機(jī)器的端口數(shù)
。實(shí)際并發(fā)連接數(shù)受限于linux可打開文件數(shù),這個(gè)數(shù)是可以配置的,可以非常大,所以實(shí)際上受限于系統(tǒng)性能。通過#ulimit -n
查看服務(wù)的最大文件句柄數(shù),通過ulimit -n xxx
修改 xxx是你想要能打開的數(shù)量。也可以通過修改系統(tǒng)參數(shù):
#vi /etc/security/limits.conf * soft nofile 65536 * hard nofile 65536
2.為什么TIME_WAIT
狀態(tài)還需要等2MSL
后才能返回到CLOSED
狀態(tài)?
這是因?yàn)殡m然雙方都同意關(guān)閉連接了,而且握手的4個(gè)報(bào)文也都協(xié)調(diào)和發(fā)送完畢,按理可以直接回到CLOSED狀態(tài)(就好比從SYN_SEND
狀態(tài)到ESTABLISH
狀態(tài)那樣);但是因?yàn)槲覀儽仨氁傧刖W(wǎng)絡(luò)是不可靠的,你無法保證你最后發(fā)送的ACK報(bào)文會(huì)一定被對(duì)方收到,因此對(duì)方處于LAST_ACK
狀態(tài)下的Socket
可能會(huì)因?yàn)槌瑫r(shí)未收到ACK
報(bào)文,而重發(fā)FIN
報(bào)文,所以這個(gè)TIME_WAIT
狀態(tài)的作用就是用來重發(fā)可能丟失的ACK
報(bào)文。
3.TIME_WAIT狀態(tài)還需要等2MSL后才能返回到CLOSED狀態(tài)會(huì)產(chǎn)生什么問題
通信雙方建立TCP連接后,主動(dòng)關(guān)閉連接的一方就會(huì)進(jìn)入TIME_WAIT
狀態(tài),TIME_WAIT
狀態(tài)維持時(shí)間是兩個(gè)MSL時(shí)間長度,也就是在1-4分鐘,Windows操作系統(tǒng)就是4分鐘。進(jìn)入TIME_WAIT
狀態(tài)的一般情況下是客戶端,一個(gè)TIME_WAIT
狀態(tài)的連接就占用了一個(gè)本地端口。一臺(tái)機(jī)器上端口號(hào)數(shù)量的上限是65536個(gè),如果在同一臺(tái)機(jī)器上進(jìn)行壓力測試模擬上萬的客戶請(qǐng)求,并且循環(huán)與服務(wù)端進(jìn)行短連接通信,那么這臺(tái)機(jī)器將產(chǎn)生4000個(gè)左右的TIME_WAIT
Socket,后續(xù)的短連接就會(huì)產(chǎn)生address already in use : connect
的異常,如果使用Nginx
作為方向代理也需要考慮TIME_WAIT
狀態(tài),發(fā)現(xiàn)系統(tǒng)存在大量TIME_WAIT
狀態(tài)的連接,通過調(diào)整內(nèi)核參數(shù)解決。
vi /etc/sysctl.conf
編輯文件,加入以下內(nèi)容:
net.ipv4.tcp_syncookies = 1 net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_tw_recycle = 1 net.ipv4.tcp_fin_timeout = 30
然后執(zhí)行 /sbin/sysctl -p
讓參數(shù)生效。
net.ipv4.tcp_syncookies = 1 表示開啟SYN Cookies。當(dāng)出現(xiàn)SYN等待隊(duì)列溢出時(shí),啟用cookies來處理,可防范少量SYN攻擊,默認(rèn)為0,表示關(guān)閉;
net.ipv4.tcp_tw_reuse = 1 表示開啟重用。允許將TIME-WAIT sockets重新用于新的TCP連接,默認(rèn)為0,表示關(guān)閉;
net.ipv4.tcp_tw_recycle = 1 表示開啟TCP連接中TIME-WAIT sockets的快速回收,默認(rèn)為0,表示關(guān)閉。
net.ipv4.tcp_fin_timeout 修改系統(tǒng)默認(rèn)的TIMEOUT
時(shí)間
HTTP協(xié)議
關(guān)于TCP/IP和HTTP協(xié)議的關(guān)系,網(wǎng)絡(luò)有一段比較容易理解的介紹:“我們在傳輸數(shù)據(jù)時(shí),可以只使用(傳輸層)TCP/IP協(xié)議,但是那樣的話,如果沒有應(yīng)用層,便無法識(shí)別數(shù)據(jù)內(nèi)容。如果想要使傳輸?shù)臄?shù)據(jù)有意義,則必須使用到應(yīng)用層協(xié)議。應(yīng)用層協(xié)議有很多,比如HTTP、FTP、TELNET等,也可以自己定義應(yīng)用層協(xié)議。
HTTP協(xié)議即超文本傳送協(xié)議(Hypertext Transfer Protocol ),是Web聯(lián)網(wǎng)的基礎(chǔ),也是手機(jī)聯(lián)網(wǎng)常用的協(xié)議之一,WEB使用HTTP協(xié)議作應(yīng)用層協(xié)議,以封裝HTTP文本信息,然后使用TCP/IP做傳輸層協(xié)議將它發(fā)到網(wǎng)絡(luò)上。
由于HTTP在每次請(qǐng)求結(jié)束后都會(huì)主動(dòng)釋放連接,因此HTTP連接是一種“短連接”,要保持客戶端程序的在線狀態(tài),需要不斷地向服務(wù)器發(fā)起連接請(qǐng)求。通常 的做法是即時(shí)不需要獲得任何數(shù)據(jù),客戶端也保持每隔一段固定的時(shí)間向服務(wù)器發(fā)送一次“保持連接”的請(qǐng)求,服務(wù)器在收到該請(qǐng)求后對(duì)客戶端進(jìn)行回復(fù),表明知道 客戶端“在線”。若服務(wù)器長時(shí)間無法收到客戶端的請(qǐng)求,則認(rèn)為客戶端“下線”,若客戶端長時(shí)間無法收到服務(wù)器的回復(fù),則認(rèn)為網(wǎng)絡(luò)已經(jīng)斷開。
下面是一個(gè)簡單的HTTP Post application/json數(shù)據(jù)內(nèi)容的請(qǐng)求:
POST HTTP/1.1 Host: 127.0.0.1:9017 Content-Type: application/json Cache-Control: no-cache {"a":"a"}
關(guān)于Socket(套接字)
現(xiàn)在我們了解到TCP/IP只是一個(gè)協(xié)議棧,就像操作系統(tǒng)的運(yùn)行機(jī)制一樣,必須要具體實(shí)現(xiàn),同時(shí)還要提供對(duì)外的操作接口。就像操作系統(tǒng)會(huì)提供標(biāo)準(zhǔn)的編程接口,比如Win32編程接口一樣,TCP/IP也必須對(duì)外提供編程接口,這就是Socket?,F(xiàn)在我們知道,Socket跟TCP/IP并沒有必然的聯(lián)系。Socket編程接口在設(shè)計(jì)的時(shí)候,就希望也能適應(yīng)其他的網(wǎng)絡(luò)協(xié)議。所以,Socket的出現(xiàn)只是可以更方便的使用TCP/IP協(xié)議棧而已,其對(duì)TCP/IP進(jìn)行了抽象,形成了幾個(gè)最基本的函數(shù)接口。比如create,listen,accept,connect,read和write等等。
不同語言都有對(duì)應(yīng)的建立Socket服務(wù)端和客戶端的庫,下面舉例Nodejs如何創(chuàng)建服務(wù)端和客戶端:
服務(wù)端:
const net = require('net'); const server = net.createServer(); server.on('connection', (client) => { client.write('Hi!\n'); // 服務(wù)端向客戶端輸出信息,使用 write() 方法 client.write('Bye!\n'); //client.end(); // 服務(wù)端結(jié)束該次會(huì)話 }); server.listen(9000);
服務(wù)監(jiān)聽9000端口
下面使用命令行發(fā)送http請(qǐng)求和telnet
$ curl http://127.0.0.1:9000 Bye! $telnet 127.0.0.1 9000 Trying 192.168.1.21... Connected to 192.168.1.21. Escape character is '^]'. Hi! Bye! Connection closed by foreign host.
注意到curl只處理了一次報(bào)文。
客戶端
const client = new net.Socket(); client.connect(9000, '127.0.0.1', function () { }); client.on('data', (chunk) => { console.log('data', chunk.toString()) //data Hi! //Bye! });
Socket長連接
所謂長連接,指在一個(gè)TCP連接上可以連續(xù)發(fā)送多個(gè)數(shù)據(jù)包,在TCP連接保持期間,如果沒有數(shù)據(jù)包發(fā)送,需要雙方發(fā)檢測包以維持此連接(心跳包),一般需要自己做在線維持。 短連接是指通信雙方有數(shù)據(jù)交互時(shí),就建立一個(gè)TCP連接,數(shù)據(jù)發(fā)送完成后,則斷開此TCP連接。比如Http的,只是連接、請(qǐng)求、關(guān)閉,過程時(shí)間較短,服務(wù)器若是一段時(shí)間內(nèi)沒有收到請(qǐng)求即可關(guān)閉連接。其實(shí)長連接是相對(duì)于通常的短連接而說的,也就是長時(shí)間保持客戶端與服務(wù)端的連接狀態(tài)。
通常的短連接操作步驟是:
連接→數(shù)據(jù)傳輸→關(guān)閉連接;
而長連接通常就是:
連接→數(shù)據(jù)傳輸→保持連接(心跳)→數(shù)據(jù)傳輸→保持連接(心跳)→……→關(guān)閉連接;
什么時(shí)候用長連接,短連接?
長連接多用于操作頻繁,點(diǎn)對(duì)點(diǎn)的通訊,而且連接數(shù)不能太多情況,。每個(gè)TCP連接都需要三步握手,這需要時(shí)間,如果每個(gè)操作都是先連接,再操作的話那么處理 速度會(huì)降低很多,所以每個(gè)操作完后都不斷開,次處理時(shí)直接發(fā)送數(shù)據(jù)包就OK了,不用建立TCP連接。
例如:數(shù)據(jù)庫的連接用長連接, 如果用短連接頻繁的通信會(huì)造成Socket錯(cuò)誤,而且頻繁的Socket創(chuàng)建也是對(duì)資源的浪費(fèi)。
什么是心跳包為什么需要:
心跳包就是在客戶端和服務(wù)端間定時(shí)通知對(duì)方自己狀態(tài)的一個(gè)自己定義的命令字,按照一定的時(shí)間間隔發(fā)送,類似于心跳,所以叫做心跳包。網(wǎng)絡(luò)中的接收和發(fā)送數(shù)據(jù)都是使用Socket進(jìn)行實(shí)現(xiàn)。但是如果此套接字已經(jīng)斷開(比如一方斷網(wǎng)了),那發(fā)送數(shù)據(jù)和接收數(shù)據(jù)的時(shí)候就一定會(huì)有問題。可是如何判斷這個(gè)套接字是否還可以使用呢?這個(gè)就需要在系統(tǒng)中創(chuàng)建心跳機(jī)制。其實(shí)TCP中已經(jīng)為我們實(shí)現(xiàn)了一個(gè)叫做心跳的機(jī)制。如果你設(shè)置了心跳,那TCP就會(huì)在一定的時(shí)間(比如你設(shè)置的是3秒鐘)內(nèi)發(fā)送你設(shè)置的次數(shù)的心跳(比如說2次),并且此信息不會(huì)影響你自己定義的協(xié)議。也可以自己定義,所謂“心跳”就是定時(shí)發(fā)送一個(gè)自定義的結(jié)構(gòu)體(心跳包或心跳幀),讓對(duì)方知道自己“在線”,以確保鏈接的有效性。
實(shí)現(xiàn):服務(wù)端:
const net = require('net'); let clientList = []; const heartbeat = 'HEARTBEAT'; // 定義心跳包內(nèi)容確保和平時(shí)發(fā)送的數(shù)據(jù)不會(huì)沖突 const server = net.createServer(); server.on('connection', (client) => { console.log('客戶端建立連接:', client.remoteAddress + ':' + client.remotePort); clientList.push(client); client.on('data', (chunk) => { let content = chunk.toString(); if (content === heartbeat) { console.log('收到客戶端發(fā)過來的一個(gè)心跳包'); } else { console.log('收到客戶端發(fā)過來的數(shù)據(jù):', content); client.write('服務(wù)端的數(shù)據(jù):' + content); } }); client.on('end', () => { console.log('收到客戶端end'); clientList.splice(clientList.indexOf(client), 1); }); client.on('error', () => { clientList.splice(clientList.indexOf(client), 1); }) }); server.listen(9000); setInterval(broadcast, 10000); // 定時(shí)發(fā)送心跳包 function broadcast() { console.log('broadcast heartbeat', clientList.length); let cleanup = [] for (let i=0;i<clientList.length;i+=1) { if (clientList[i].writable) { // 先檢查 sockets 是否可寫 clientList[i].write(heartbeat); } else { console.log('一個(gè)無效的客戶端'); cleanup.push(clientList[i]); // 如果不可寫,收集起來銷毀。銷毀之前要 Socket.destroy() 用 API 的方法銷毀。 clientList[i].destroy(); } } //Remove dead Nodes out of write loop to avoid trashing loop index for (let i=0; i<cleanup.length; i+=1) { console.log('刪除無效的客戶端:', cleanup[i].name); clientList.splice(clientList.indexOf(cleanup[i]), 1); } }
服務(wù)端輸出結(jié)果:
客戶端建立連接: ::ffff:127.0.0.1:57125
broadcast heartbeat 1
收到客戶端發(fā)過來的數(shù)據(jù): Thu, 29 Mar 2018 03:45:15 GMT
收到客戶端發(fā)過來的一個(gè)心跳包
收到客戶端發(fā)過來的數(shù)據(jù): Thu, 29 Mar 2018 03:45:20 GMT
broadcast heartbeat 1
收到客戶端發(fā)過來的數(shù)據(jù): Thu, 29 Mar 2018 03:45:25 GMT
收到客戶端發(fā)過來的一個(gè)心跳包
客戶端建立連接: ::ffff:127.0.0.1:57129
收到客戶端發(fā)過來的一個(gè)心跳包
收到客戶端發(fā)過來的數(shù)據(jù): Thu, 29 Mar 2018 03:46:00 GMT
收到客戶端發(fā)過來的數(shù)據(jù): Thu, 29 Mar 2018 03:46:04 GMT
broadcast heartbeat 2
收到客戶端發(fā)過來的數(shù)據(jù): Thu, 29 Mar 2018 03:46:05 GMT
收到客戶端發(fā)過來的一個(gè)心跳包
客戶端代碼:
const net = require('net'); const heartbeat = 'HEARTBEAT'; const client = new net.Socket(); client.connect(9000, '127.0.0.1', () => {}); client.on('data', (chunk) => { let content = chunk.toString(); if (content === heartbeat) { console.log('收到心跳包:', content); } else { console.log('收到數(shù)據(jù):', content); } }); // 定時(shí)發(fā)送數(shù)據(jù) setInterval(() => { console.log('發(fā)送數(shù)據(jù)', new Date().toUTCString()); client.write(new Date().toUTCString()); }, 5000); // 定時(shí)發(fā)送心跳包 setInterval(function () { client.write(heartbeat); }, 10000);
客戶端輸出結(jié)果:
發(fā)送數(shù)據(jù) Thu, 29 Mar 2018 03:46:04 GMT
收到數(shù)據(jù): 服務(wù)端的數(shù)據(jù):Thu, 29 Mar 2018 03:46:04 GMT
收到心跳包: HEARTBEAT
發(fā)送數(shù)據(jù) Thu, 29 Mar 2018 03:46:09 GMT
收到數(shù)據(jù): 服務(wù)端的數(shù)據(jù):Thu, 29 Mar 2018 03:46:09 GMT
發(fā)送數(shù)據(jù) Thu, 29 Mar 2018 03:46:14 GMT
收到數(shù)據(jù): 服務(wù)端的數(shù)據(jù):Thu, 29 Mar 2018 03:46:14 GMT
收到心跳包: HEARTBEAT
發(fā)送數(shù)據(jù) Thu, 29 Mar 2018 03:46:19 GMT
收到數(shù)據(jù): 服務(wù)端的數(shù)據(jù):Thu, 29 Mar 2018 03:46:19 GMT
發(fā)送數(shù)據(jù) Thu, 29 Mar 2018 03:46:24 GMT
收到數(shù)據(jù): 服務(wù)端的數(shù)據(jù):Thu, 29 Mar 2018 03:46:24 GMT
收到心跳包: HEARTBEAT
定義自己的協(xié)議
如果想要使傳輸?shù)臄?shù)據(jù)有意義,則必須使用到應(yīng)用層協(xié)議比如Http、Mqtt、Dubbo等?;赥CP協(xié)議上自定義自己的應(yīng)用層的協(xié)議需要解決的幾個(gè)問題:
- 心跳包格式的定義及處理
- 報(bào)文頭的定義,就是你發(fā)送數(shù)據(jù)的時(shí)候需要先發(fā)送報(bào)文頭,報(bào)文里面能解析出你將要發(fā)送的數(shù)據(jù)長度
- 你發(fā)送數(shù)據(jù)包的格式,是json的還是其他序列化的方式
下面我們就一起來定義自己的協(xié)議,并編寫服務(wù)的和客戶端進(jìn)行調(diào)用:
定義報(bào)文頭格式: length:000000000xxxx
; xxxx代表數(shù)據(jù)的長度,總長度20,舉例子不嚴(yán)謹(jǐn)。
數(shù)據(jù)表的格式: Json
服務(wù)端:
const net = require('net'); const server = net.createServer(); let clientList = []; const heartBeat = 'HeartBeat'; // 定義心跳包內(nèi)容確保和平時(shí)發(fā)送的數(shù)據(jù)不會(huì)沖突 const getHeader = (num) => { return 'length:' + (Array(13).join(0) + num).slice(-13); } server.on('connection', (client) => { client.name = client.remoteAddress + ':' + client.remotePort // client.write('Hi ' + client.name + '!\n'); console.log('客戶端建立連接', client.name); clientList.push(client) let chunks = []; let length = 0; client.on('data', (chunk) => { let content = chunk.toString(); console.log("content:", content, content.length); if (content === heartBeat) { console.log('收到客戶端發(fā)過來的一個(gè)心跳包'); } else { if (content.indexOf('length:') === 0){ length = parseInt(content.substring(7,20)); console.log('length', length); chunks =[chunk.slice(20, chunk.length)]; } else { chunks.push(chunk); } let heap = Buffer.concat(chunks); console.log('heap.length', heap.length) if (heap.length >= length) { try { console.log('收到數(shù)據(jù)', JSON.parse(heap.toString())); let data = '服務(wù)端的數(shù)據(jù)數(shù)據(jù):' + heap.toString();; let dataBuff = Buffer.from(JSON.stringify(data)); let header = getHeader(dataBuff.length) client.write(header); client.write(dataBuff); } catch (err) { console.log('數(shù)據(jù)解析失敗'); } } } }) client.on('end', () => { console.log('收到客戶端end'); clientList.splice(clientList.indexOf(client), 1); }); client.on('error', () => { clientList.splice(clientList.indexOf(client), 1); }) }); server.listen(9000); setInterval(broadcast, 10000); // 定時(shí)檢查客戶端 并發(fā)送心跳包 function broadcast() { console.log('broadcast heartbeat', clientList.length); let cleanup = [] for(var i=0;i<clientList.length;i+=1) { if(clientList[i].writable) { // 先檢查 sockets 是否可寫 // clientList[i].write(heartBeat); // 發(fā)送心跳數(shù)據(jù) } else { console.log('一個(gè)無效的客戶端') cleanup.push(clientList[i]) // 如果不可寫,收集起來銷毀。銷毀之前要 Socket.destroy() 用 API 的方法銷毀。 clientList[i].destroy(); } } // 刪除無效的客戶端 for(i=0; i<cleanup.length; i+=1) { console.log('刪除無效的客戶端:', cleanup[i].name); clientList.splice(clientList.indexOf(cleanup[i]), 1) } }
日志打印:
客戶端建立連接 ::ffff:127.0.0.1:50178
content: length:0000000000031 20
length 31
heap.length 0
content: "Tue, 03 Apr 2018 06:12:37 GMT" 31
heap.length 31
收到數(shù)據(jù) Tue, 03 Apr 2018 06:12:37 GMT
broadcast heartbeat 1
content: HeartBeat 9
收到客戶端發(fā)過來的一個(gè)心跳包
content: length:0000000000031"Tue, 03 Apr 2018 06:12:42 GMT" 51
length 31
heap.length 31
收到數(shù)據(jù) Tue, 03 Apr 2018 06:12:42 GMT
客戶端
const net = require('net'); const client = new net.Socket(); const heartBeat = 'HeartBeat'; // 定義心跳包內(nèi)容確保和平時(shí)發(fā)送的數(shù)據(jù)不會(huì)沖突 const getHeader = (num) => { return 'length:' + (Array(13).join(0) + num).slice(-13); } client.connect(9000, '127.0.0.1', function () {}); let chunks = []; let length = 0; client.on('data', (chunk) => { let content = chunk.toString(); console.log("content:", content, content.length); if (content === heartBeat) { console.log('收到服務(wù)端發(fā)過來的一個(gè)心跳包'); } else { if (content.indexOf('length:') === 0){ length = parseInt(content.substring(7,20)); console.log('length', length); chunks =[chunk.slice(20, chunk.length)]; } else { chunks.push(chunk); } let heap = Buffer.concat(chunks); console.log('heap.length', heap.length) if (heap.length >= length) { try { console.log('收到數(shù)據(jù)', JSON.parse(heap.toString())); } catch (err) { console.log('數(shù)據(jù)解析失敗'); } } } }); // 定時(shí)發(fā)送數(shù)據(jù) setInterval(function () { let data = new Date().toUTCString(); let dataBuff = Buffer.from(JSON.stringify(data)); let header =getHeader(dataBuff.length); client.write(header); client.write(dataBuff); }, 5000); // 定時(shí)發(fā)送心跳包 setInterval(function () { client.write(heartBeat); }, 10000);
日志打?。?/p>
content: length:0000000000060 20
length 60
heap.length 0
content: "服務(wù)端的數(shù)據(jù)數(shù)據(jù):\"Tue, 03 Apr 2018 06:12:37 GMT\"" 44
heap.length 60
收到數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):"Tue, 03 Apr 2018 06:12:37 GMT"
content: length:0000000000060"服務(wù)端的數(shù)據(jù)數(shù)據(jù):\"Tue, 03 Apr 2018 06:12:42 GMT\"" 64
length 60
heap.length 60
收到數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):"Tue, 03 Apr 2018 06:12:42 GMT"
客戶端定時(shí)發(fā)送自定義協(xié)議數(shù)據(jù)到服務(wù)端,先發(fā)送頭數(shù)據(jù),在發(fā)送內(nèi)容數(shù)據(jù),另外一個(gè)定時(shí)器發(fā)送心跳數(shù)據(jù),服務(wù)端判斷是心跳數(shù)據(jù),再判斷是不是頭數(shù)據(jù),再是內(nèi)容數(shù)據(jù),然后解析后再發(fā)送數(shù)據(jù)給客戶端。從日志的打印可以看出客戶端先后write
header
和data
數(shù)據(jù),服務(wù)端可能在一個(gè)data
事件里面接收到。
這里可以看到一個(gè)客戶端在同一個(gè)時(shí)間內(nèi)處理一個(gè)請(qǐng)求可以很好的工作,但是想象這么一個(gè)場景,如果同一時(shí)間內(nèi)讓同一個(gè)客戶端去多次調(diào)用服務(wù)端請(qǐng)求,發(fā)送多次頭數(shù)據(jù)和內(nèi)容數(shù)據(jù),服務(wù)端的data事件收到的數(shù)據(jù)就很難區(qū)別哪些數(shù)據(jù)是哪次請(qǐng)求的,比如兩次頭數(shù)據(jù)同時(shí)到達(dá)服務(wù)端,服務(wù)端就會(huì)忽略其中一次,而后面的內(nèi)容數(shù)據(jù)也不一定就對(duì)應(yīng)于這個(gè)頭的。所以想復(fù)用長連接并能很好的高并發(fā)處理服務(wù)端請(qǐng)求,就需要連接池這種方式了。
Socket連接池
什么是Socket連接池,池的概念可以聯(lián)想到是一種資源的集合,所以Socket連接池,就是維護(hù)著一定數(shù)量Socket長連接的集合。它能自動(dòng)檢測Socket長連接的有效性,剔除無效的連接,補(bǔ)充連接池的長連接的數(shù)量。從代碼層次上其實(shí)是人為實(shí)現(xiàn)這種功能的類,一般一個(gè)連接池包含下面幾個(gè)屬性:
- 空閑可使用的長連接隊(duì)列
- 正在運(yùn)行的通信的長連接隊(duì)列
- 等待去獲取一個(gè)空閑長連接的請(qǐng)求的隊(duì)列
- 無效長連接的剔除功能
- 長連接資源池的數(shù)量配置
- 長連接資源的新建功能
場景: 一個(gè)請(qǐng)求過來,首先去資源池要求獲取一個(gè)長連接資源,如果空閑隊(duì)列里面有長連接,就獲取到這個(gè)長連接Socket,并把這個(gè)Socket移到正在運(yùn)行的長連接隊(duì)列。如果空閑隊(duì)列里面沒有,且正在運(yùn)行的隊(duì)列長度小于配置的連接池資源的數(shù)量,就新建一個(gè)長連接到正在運(yùn)行的隊(duì)列去,如果正在運(yùn)行的不下于配置的資源池長度,則這個(gè)請(qǐng)求進(jìn)入到等待隊(duì)列去。當(dāng)一個(gè)正在運(yùn)行的Socket完成了請(qǐng)求,就從正在運(yùn)行的隊(duì)列移到空閑的隊(duì)列,并觸發(fā)等待請(qǐng)求隊(duì)列去獲取空閑資源,如果有等待的情況。
這里簡單介紹Nodejs的Socket連接池generic-pool模塊的源碼。
主要文件目錄結(jié)構(gòu)
. |————lib ------------------------- 代碼庫 | |————DefaultEvictor.js ---------- | |————Deferred.js ---------------- | |————Deque.js ------------------- | |————DequeIterator.js ----------- | |————DoublyLinkedList.js -------- | |————DoublyLinkedListIterator.js- | |————factoryValidator.js -------- | |————Pool.js -------------------- 連接池主要代碼 | |————PoolDefaults.js ------------ | |————PooledResource.js ---------- | |————Queue.js ------------------- 隊(duì)列 | |————ResourceLoan.js ------------ | |————ResourceRequest.js --------- | |————utils.js ------------------- 工具 |————test ------------------------- 測試目錄 |————README.md ------------------- 項(xiàng)目描述文件 |————.eslintrc ------------------- eslint靜態(tài)檢查配置文件 |————.eslintignore --------------- eslint靜態(tài)檢查忽略的文件 |————package.json ----------------- npm包依賴配置
下面介紹庫的使用:
初始化連接池
'use strict'; const net = require('net'); const genericPool = require('generic-pool'); function createPool(conifg) { let options = Object.assign({ fifo: true, // 是否優(yōu)先使用老的資源 priorityRange: 1, // 優(yōu)先級(jí) testOnBorrow: true, // 是否開啟獲取驗(yàn)證 // acquireTimeoutMillis: 10 * 1000, // 獲取的超時(shí)時(shí)間 autostart: true, // 自動(dòng)初始化和釋放調(diào)度啟用 min: 10, // 初始化連接池保持的長連接最小數(shù)量 max: 0, // 最大連接池保持的長連接數(shù)量 evictionRunIntervalMillis: 0, // 資源釋放檢驗(yàn)間隔檢查 設(shè)置了下面幾個(gè)參數(shù)才起效果 numTestsPerEvictionRun: 3, // 每次釋放資源數(shù)量 softIdleTimeoutMillis: -1, // 可用的超過了最小的min 且空閑時(shí)間時(shí)間 達(dá)到釋放 idleTimeoutMillis: 30000 // 強(qiáng)制釋放 // maxWaitingClients: 50 // 最大等待 }, conifg.options); const factory = { create: function () { return new Promise((resolve, reject) => { let socket = new net.Socket(); socket.setKeepAlive(true); socket.connect(conifg.port, conifg.host); // TODO 心跳包的處理邏輯 socket.on('connect', () => { console.log('socket_pool', conifg.host, conifg.port, 'connect' ); resolve(socket); }); socket.on('close', (err) => { // 先end 事件再close事件 console.log('socket_pool', conifg.host, conifg.port, 'close', err); }); socket.on('error', (err) => { console.log('socket_pool', conifg.host, conifg.port, 'error', err); reject(err); }); }); }, //銷毀連接 destroy: function (socket) { return new Promise((resolve) => { socket.destroy(); // 不會(huì)觸發(fā)end 事件 第一次會(huì)觸發(fā)發(fā)close事件 如果有message會(huì)觸發(fā)error事件 resolve(); }); }, validate: function (socket) { //獲取資源池校驗(yàn)資源有效性 return new Promise((resolve) => { // console.log('socket.destroyed:', socket.destroyed, 'socket.readable:', socket.readable, 'socket.writable:', socket.writable); if (socket.destroyed || !socket.readable || !socket.writable) { return resolve(false); } else { return resolve(true); } }); } }; const pool = genericPool.createPool(factory, options); pool.on('factoryCreateError', (err) => { // 監(jiān)聽新建長連接出錯(cuò) 讓請(qǐng)求直接返回錯(cuò)誤 const clientResourceRequest = pool._waitingClientsQueue.dequeue(); if (clientResourceRequest) { clientResourceRequest.reject(err); } }); return pool; }; let pool = createPool({ port: 9000, host: '127.0.0.1', options: {min: 0, max: 10} });
使用連接池
下面連接池的使用,使用的協(xié)議是我們之前自定義的協(xié)議。
let pool = createPool({ port: 9000, host: '127.0.0.1', options: {min: 0, max: 10} }); const getHeader = (num) => { return 'length:' + (Array(13).join(0) + num).slice(-13); } const request = async (requestDataBuff) => { let client; try { client = await pool.acquire(); } catch (e) { console.log('acquire socket client failed: ', e); throw e; } let timeout = 10000; return new Promise((resolve, reject) => { let chunks = []; let length = 0; client.setTimeout(timeout); client.removeAllListeners('error'); client.on('error', (err) => { client.removeAllListeners('error'); client.removeAllListeners('data'); client.removeAllListeners('timeout'); pool.destroyed(client); reject(err); }); client.on('timeout', () => { client.removeAllListeners('error'); client.removeAllListeners('data'); client.removeAllListeners('timeout'); // 應(yīng)該銷毀以防下一個(gè)req的data事件監(jiān)聽才返回?cái)?shù)據(jù) pool.destroy(client); // pool.release(client); reject(`socket connect timeout set ${timeout}`); }); let header = getHeader(requestDataBuff.length); client.write(header); client.write(requestDataBuff); client.on('data', (chunk) => { let content = chunk.toString(); console.log('content', content, content.length); // TODO 過濾心跳包 if (content.indexOf('length:') === 0){ length = parseInt(content.substring(7,20)); console.log('length', length); chunks =[chunk.slice(20, chunk.length)]; } else { chunks.push(chunk); } let heap = Buffer.concat(chunks); console.log('heap.length', heap.length); if (heap.length >= length) { pool.release(client); client.removeAllListeners('error'); client.removeAllListeners('data'); client.removeAllListeners('timeout'); try { // console.log('收到數(shù)據(jù)', JSON.parse(heap.toString())); resolve(JSON.parse(heap.toString())); } catch (err) { reject(err); console.log('數(shù)據(jù)解析失敗'); } } }); }); } request(Buffer.from(JSON.stringify({a: 'a'}))) .then((data) => { console.log('收到服務(wù)的數(shù)據(jù)',data) }).catch(err => { console.log(err); }); request(Buffer.from(JSON.stringify({b: 'b'}))) .then((data) => { console.log('收到服務(wù)的數(shù)據(jù)',data) }).catch(err => { console.log(err); }); setTimeout(function () { //查看是否會(huì)復(fù)用Socket 有沒有建立新的連接 request(Buffer.from(JSON.stringify({c: 'c'}))) .then((data) => { console.log('收到服務(wù)的數(shù)據(jù)',data) }).catch(err => { console.log(err); }); request(Buffer.from(JSON.stringify({d: 'd'}))) .then((data) => { console.log('收到服務(wù)的數(shù)據(jù)',data) }).catch(err => { console.log(err); }); }, 1000)
日志打印:
socket_pool 127.0.0.1 9000 connect
socket_pool 127.0.0.1 9000 connect
content length:0000000000040"服務(wù)端的數(shù)據(jù)數(shù)據(jù):{\"a\":\"a\"}" 44
length 40
heap.length 40
收到服務(wù)的數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"a":"a"}
content length:0000000000040"服務(wù)端的數(shù)據(jù)數(shù)據(jù):{\"b\":\"b\"}" 44
length 40
heap.length 40
收到服務(wù)的數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"b":"b"}
content length:0000000000040 20
length 40
heap.length 0
content "服務(wù)端的數(shù)據(jù)數(shù)據(jù):{\"c\":\"c\"}" 24
heap.length 40
收到服務(wù)的數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"c":"c"}
content length:0000000000040"服務(wù)端的數(shù)據(jù)數(shù)據(jù):{\"d\":\"d\"}" 44
length 40
heap.length 40
收到服務(wù)的數(shù)據(jù) 服務(wù)端的數(shù)據(jù)數(shù)據(jù):{"d":"d"}
這里看到前面兩個(gè)請(qǐng)求都建立了新的Socket連接 socket_pool 127.0.0.1 9000 connect
,定時(shí)器結(jié)束后重新發(fā)起兩個(gè)請(qǐng)求就沒有建立新的Socket連接了,直接從連接池里面獲取Socket連接資源。
源碼分析
發(fā)現(xiàn)主要的代碼就位于lib文件夾中的Pool.js
構(gòu)造函數(shù):lib/Pool.js
/** * Generate an Object pool with a specified `factory` and `config`. * * @param {typeof DefaultEvictor} Evictor * @param {typeof Deque} Deque * @param {typeof PriorityQueue} PriorityQueue * @param {Object} factory * Factory to be used for generating and destroying the items. * @param {Function} factory.create * Should create the item to be acquired, * and call it's first callback argument with the generated item as it's argument. * @param {Function} factory.destroy * Should gently close any resources that the item is using. * Called before the items is destroyed. * @param {Function} factory.validate * Test if a resource is still valid .Should return a promise that resolves to a boolean, true if resource is still valid and false * If it should be removed from pool. * @param {Object} options */ constructor(Evictor, Deque, PriorityQueue, factory, options) { super(); factoryValidator(factory); // 檢驗(yàn)我們定義的factory的有效性包含create destroy validate this._config = new PoolOptions(options); // 連接池配置 // TODO: fix up this ugly glue-ing this._Promise = this._config.Promise; this._factory = factory; this._draining = false; this._started = false; /** * Holds waiting clients * @type {PriorityQueue} */ this._waitingClientsQueue = new PriorityQueue(this._config.priorityRange); // 請(qǐng)求的對(duì)象管管理隊(duì)列queue 初始化queue的size 1 { _size: 1, _slots: [ Queue { _list: [Object] } ] } /** * Collection of promises for resource creation calls made by the pool to factory.create * @type {Set} */ this._factoryCreateOperations = new Set(); // 正在創(chuàng)建的長連接 /** * Collection of promises for resource destruction calls made by the pool to factory.destroy * @type {Set} */ this._factoryDestroyOperations = new Set(); // 正在銷毀的長連接 /** * A queue/stack of pooledResources awaiting acquisition * TODO: replace with LinkedList backed array * @type {Deque} */ this._availableObjects = new Deque(); // 空閑的資源長連接 /** * Collection of references for any resource that are undergoing validation before being acquired * @type {Set} */ this._testOnBorrowResources = new Set(); // 正在檢驗(yàn)有效性的資源 /** * Collection of references for any resource that are undergoing validation before being returned * @type {Set} */ this._testOnReturnResources = new Set(); /** * Collection of promises for any validations currently in process * @type {Set} */ this._validationOperations = new Set();// 正在校驗(yàn)的中間temp /** * All objects associated with this pool in any state (except destroyed) * @type {Set} */ this._allObjects = new Set(); // 所有的鏈接資源 是一個(gè) PooledResource對(duì)象 /** * Loans keyed by the borrowed resource * @type {Map} */ this._resourceLoans = new Map(); // 被借用的對(duì)象的map release的時(shí)候用到 /** * Infinitely looping iterator over available object * @type {DequeIterator} */ this._evictionIterator = this._availableObjects.iterator(); // 一個(gè)迭代器 this._evictor = new Evictor(); /** * handle for setTimeout for next eviction run * @type {(number|null)} */ this._scheduledEviction = null; // create initial resources (if factory.min > 0) if (this._config.autostart === true) { // 初始化最小的連接數(shù)量 this.start(); } }
可以看到包含之前說的空閑的資源隊(duì)列,正在請(qǐng)求的資源隊(duì)列,正在等待的請(qǐng)求隊(duì)列等。
下面查看 Pool.acquire 方法lib/Pool.js
/** * Request a new resource. The callback will be called, * when a new resource is available, passing the resource to the callback. * TODO: should we add a seperate "acquireWithPriority" function * * @param {Number} [priority=0] * Optional. Integer between 0 and (priorityRange - 1). Specifies the priority * of the caller if there are no available resources. Lower numbers mean higher * priority. * * @returns {Promise} */ acquire(priority) { // 空閑資源隊(duì)列資源是有優(yōu)先等級(jí)的 if (this._started === false && this._config.autostart === false) { this.start(); // 會(huì)在this._allObjects 添加min的連接對(duì)象數(shù) } if (this._draining) { // 如果是在資源釋放階段就不能再請(qǐng)求資源了 return this._Promise.reject( new Error("pool is draining and cannot accept work") ); } // 如果要設(shè)置了等待隊(duì)列的長度且要等待 如果超過了就返回資源不可獲取 // TODO: should we defer this check till after this event loop incase "the situation" changes in the meantime if ( this._config.maxWaitingClients !== undefined && this._waitingClientsQueue.length >= this._config.maxWaitingClients ) { return this._Promise.reject( new Error("max waitingClients count exceeded") ); } const resourceRequest = new ResourceRequest( this._config.acquireTimeoutMillis, // 對(duì)象里面的超時(shí)配置 表示等待時(shí)間 會(huì)啟動(dòng)一個(gè)定時(shí) 超時(shí)了就觸發(fā)resourceRequest.promise 的reject觸發(fā) this._Promise ); // console.log(resourceRequest) this._waitingClientsQueue.enqueue(resourceRequest, priority); // 請(qǐng)求進(jìn)入等待請(qǐng)求隊(duì)列 this._dispense(); // 進(jìn)行資源分發(fā) 最終會(huì)觸發(fā)resourceRequest.promise的resolve(client) return resourceRequest.promise; // 返回的是一個(gè)promise對(duì)象resolve卻是在其他地方觸發(fā) }
/** * Attempt to resolve an outstanding resource request using an available resource from * the pool, or creating new ones * * @private */ _dispense() { /** * Local variables for ease of reading/writing * these don't (shouldn't) change across the execution of this fn */ const numWaitingClients = this._waitingClientsQueue.length; // 正在等待的請(qǐng)求的隊(duì)列長度 各個(gè)優(yōu)先級(jí)的總和 console.log('numWaitingClients', numWaitingClients) // 1 // If there aren't any waiting requests then there is nothing to do // so lets short-circuit if (numWaitingClients < 1) { return; } // max: 10, min: 4 console.log('_potentiallyAllocableResourceCount', this._potentiallyAllocableResourceCount) // 目前潛在空閑可用的連接數(shù)量 const resourceShortfall = numWaitingClients - this._potentiallyAllocableResourceCount; // 還差幾個(gè)可用的 小于零表示不需要 大于0表示需要新建長連接的數(shù)量 console.log('spareResourceCapacity', this.spareResourceCapacity) // 距離max數(shù)量的還有幾個(gè)沒有創(chuàng)建 const actualNumberOfResourcesToCreate = Math.min( this.spareResourceCapacity, // -6 resourceShortfall // 這個(gè)是 -3 ); // 如果resourceShortfall>0 表示需要新建但是這新建的數(shù)量不能超過spareResourceCapacity最多可創(chuàng)建的 console.log('actualNumberOfResourcesToCreate', actualNumberOfResourcesToCreate) // 如果actualNumberOfResourcesToCreate >0 表示需要?jiǎng)?chuàng)建連接 for (let i = 0; actualNumberOfResourcesToCreate > i; i++) { this._createResource(); // 新增新的長連接 } // If we are doing test-on-borrow see how many more resources need to be moved into test // to help satisfy waitingClients if (this._config.testOnBorrow === true) { // 如果開啟了使用前校驗(yàn)資源的有效性 // how many available resources do we need to shift into test const desiredNumberOfResourcesToMoveIntoTest = numWaitingClients - this._testOnBorrowResources.size;// 1 const actualNumberOfResourcesToMoveIntoTest = Math.min( this._availableObjects.length, // 3 desiredNumberOfResourcesToMoveIntoTest // 1 ); for (let i = 0; actualNumberOfResourcesToMoveIntoTest > i; i++) { // 需要有效性校驗(yàn)的數(shù)量 至少滿足最小的waiting clinet this._testOnBorrow(); // 資源有效校驗(yàn)后再分發(fā) } } // if we aren't testing-on-borrow then lets try to allocate what we can if (this._config.testOnBorrow === false) { // 如果沒有開啟有效性校驗(yàn) 就開啟有效資源的分發(fā) const actualNumberOfResourcesToDispatch = Math.min( this._availableObjects.length, numWaitingClients ); for (let i = 0; actualNumberOfResourcesToDispatch > i; i++) { // 開始分發(fā)資源 this._dispatchResource(); } } }
/** * Attempt to move an available resource to a waiting client * @return {Boolean} [description] */ _dispatchResource() { if (this._availableObjects.length < 1) { return false; } const pooledResource = this._availableObjects.shift(); // 從可以資源池里面取出一個(gè) this._dispatchPooledResourceToNextWaitingClient(pooledResource); // 分發(fā) return false; } /** * Dispatches a pooledResource to the next waiting client (if any) else * puts the PooledResource back on the available list * @param {PooledResource} pooledResource [description] * @return {Boolean} [description] */ _dispatchPooledResourceToNextWaitingClient(pooledResource) { const clientResourceRequest = this._waitingClientsQueue.dequeue(); // 可能是undefined 取出一個(gè)等待的quene console.log('clientResourceRequest.state', clientResourceRequest.state); if (clientResourceRequest === undefined || clientResourceRequest.state !== Deferred.PENDING) { console.log('沒有等待的') // While we were away either all the waiting clients timed out // or were somehow fulfilled. put our pooledResource back. this._addPooledResourceToAvailableObjects(pooledResource); // 在可用的資源里面添加一個(gè) // TODO: do need to trigger anything before we leave? return false; } // TODO clientResourceRequest 的state是否需要判斷 如果已經(jīng)是resolve的狀態(tài) 已經(jīng)超時(shí)回去了 這個(gè)是否有問題 const loan = new ResourceLoan(pooledResource, this._Promise); this._resourceLoans.set(pooledResource.obj, loan); // _resourceLoans 是個(gè)map k=>value pooledResource.obj 就是socket本身 pooledResource.allocate(); // 標(biāo)識(shí)資源的狀態(tài)是正在被使用 clientResourceRequest.resolve(pooledResource.obj); // acquire方法返回的promise對(duì)象的resolve在這里執(zhí)行的 return true; }
上面的代碼就按種情況一直走下到最終獲取到長連接的資源,其他更多代碼大家可以自己去深入了解。
以上就是Nodejs Socket連接池及TCP HTTP網(wǎng)絡(luò)模型詳解的詳細(xì)內(nèi)容,更多關(guān)于Nodejs Socket連接池TCP HTTP的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
nodejs body-parser 解析post數(shù)據(jù)實(shí)例
下面小編就為大家?guī)硪黄猲odejs body-parser 解析post數(shù)據(jù)實(shí)例。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-07-07詳解如何在Node.js中執(zhí)行CPU密集型任務(wù)
Node.js通常被認(rèn)為不適合CPU密集型應(yīng)用程序,Node.js的工作原理使其在處理I/O密集型任務(wù)時(shí)大放異彩,雖然執(zhí)行CPU密集型任務(wù)肯定不是Node的主要使用場景,但是我們依舊有方法來改善這些問題,本文詳細(xì)給大家介紹了如何在Node.js中執(zhí)行CPU密集型任務(wù)2023-12-12如何解決安裝websocket還是報(bào)錯(cuò)Cannot find module'ws&apos
這篇文章主要介紹了如何解決安裝websocket還是報(bào)Cannot find module'ws'問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-02-02node.js中優(yōu)雅的使用Socket.IO模塊的方法
Socket.IO是一個(gè)WebSocket庫,包括了客戶端的js和服務(wù)器端的node.js,它的目標(biāo)是構(gòu)建可以在不同瀏覽器和移動(dòng)設(shè)備上使用的實(shí)時(shí)應(yīng)用,這篇文章主要介紹了node.js中優(yōu)雅的使用Socket.IO模塊,需要的朋友可以參考下2022-12-12Node.js折騰記一:讀指定文件夾,輸出該文件夾的文件樹詳解
這篇文章主要介紹了Node.js讀指定文件夾輸出該文件夾文件樹,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04