node.js中TCP Socket多進(jìn)程間的消息推送示例詳解
前言
前段時(shí)間接到了一個(gè)支付中轉(zhuǎn)服務(wù)的需求,即支付數(shù)據(jù)通過(guò)http接口傳到中轉(zhuǎn)服務(wù)器,中轉(zhuǎn)服務(wù)器將支付數(shù)據(jù)發(fā)送到異構(gòu)后臺(tái)(Lua)的指定tcp socket。

一開始評(píng)估的時(shí)候感覺(jué)蠻簡(jiǎn)單的,就是http server和tcp server間的通信,不是一個(gè)Event實(shí)例就能解決的狀態(tài)管理問(wèn)題嗎?注冊(cè)一個(gè)事件A用于消息傳遞,在socket連接時(shí)注冊(cè)唯一的ID,然后在http接收到數(shù)據(jù)時(shí),emit事件A;在監(jiān)聽到事件A時(shí),在tcp server中尋找指定ID對(duì)應(yīng)的socket處理該數(shù)據(jù)即可。
盡管node.js在高并發(fā)方面有不錯(cuò)的性能,但是單個(gè)tcp server實(shí)例的承載能力有限,為避免服務(wù)器過(guò)載,node.js 單進(jìn)程的內(nèi)存有上限(默認(rèn)2G),能容納的長(zhǎng)連接客戶端數(shù)不多。但隨著業(yè)務(wù)的擴(kuò)大,我們需要考慮多機(jī)集群部署,客戶端可以連接到任一節(jié)點(diǎn),并發(fā)送消息。如何做到多節(jié)點(diǎn)的同時(shí)推送,我們需要建立一套多節(jié)點(diǎn)之間的消息分發(fā)/訂閱架構(gòu)。常用的第三方消息管理庫(kù)有 RabbitMQ和Redis等。在這里,我用的是Redis的訂閱發(fā)布服務(wù)。
redis.io有一個(gè)比較成熟的redis消息中轉(zhuǎn)庫(kù)socket.io-redis (本地下載)。但我們項(xiàng)目中異構(gòu)后臺(tái)用到的并非websocket,而是原生的TCP原生的Socket。用原生redis的sub/pubs實(shí)現(xiàn)并不難,就手寫了。
redis在該項(xiàng)目中主要起到一個(gè)消息分發(fā)中心(publish/subscribe)的作用。當(dāng)http請(qǐng)求的支付數(shù)據(jù)發(fā)送過(guò)來(lái)時(shí),則通過(guò)redis的publish功能往所有的channel推送消息,這樣所有訂閱該channel的socket server就能收到回調(diào),然后推送到指定客戶端。在應(yīng)用層看跟Event事件消息的處理差不多。
const redis = require("redis"),
redisClient = redis.createClient,
REDIS_CFG = {
host: '127.0.0.1',
port: 6379
},
sub = redisClient(REDIS_CFG),
pub = redisClient(REDIS_CFG),
PAY_MQ_CHANNEL = 'pay_mq_channel';
// 監(jiān)聽頻道的消息回調(diào)
sub.on('message', function(channel, message) {
switch (channle){
case PAY_MQ_CHANNEL:
console.log('notification received:', message);
// 廣播消息到指定socket
break;
}
});
// 訂閱頻道
sub.subscribe(PAY_MQ_CHANNEL);
// 當(dāng)接收到支付數(shù)據(jù)時(shí),推送頻道消息
pub.publish(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});
由于redis的sub/pub的channel訂閱數(shù)有上限,所以建議一類消息使用一個(gè)channel,一個(gè)channel下使用map、set或數(shù)組來(lái)存儲(chǔ)訂閱時(shí)的回調(diào)函數(shù),在接收到訂閱消息時(shí)遍歷執(zhí)行回調(diào)函數(shù)。
下面是我封裝好的Redis組件(RedisMQProxy.js):
/*
* redis 訂閱/發(fā)布
*/
const _ = require('lodash'),
redis = require("redis"),
REDIS_CFG = {
host: '127.0.0.1',
port: 6379
},
sub = redisClient(REDIS_CFG),
pub = redisClient(REDIS_CFG);
let SubListenerFuns = {}; // channel的回調(diào)函數(shù)列表
let RedisMQProxy = {
// 訂閱channel
on(channel, cb, errorCb, once = false) {
sub.subscribe(channel); // 訂閱channel消息
// 將回調(diào)函數(shù)存放數(shù)組中
SubListenerFuns[channel] = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
SubListenerFuns[channel].push({
once, cb, errorCb
});
},
// 監(jiān)聽一次性的channel回調(diào)函數(shù)
once(channel, cb, errorCb) {
this.on(channel, cb, errorCb, true);
},
// 發(fā)送channel消息
emit(channel, message) {
if(!_.isString(message)) {
message = JSON.stringify(message);
}
pub.publish(channel, message);
},
// 移除channel上的監(jiān)聽函數(shù)
removeListener(channel, func) {
let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
for(let i = 0, l = channelHandlers.length; i < l; i++) {
let handler = channelHandlers[i] || {};
let cb = handler.cb;
if(func && func == cb) {
channelHandlers.splice(i, 1);
return false;
}
}
}
};
RedisMQProxy.SubListeners = SubListenerFuns;
pub.on('error', onError);
sub.on('error', onError);
// 監(jiān)聽redis的訂閱消息
sub.on("message", function(channel, message) {
// 遍歷執(zhí)行channel的回調(diào)函數(shù)
try {
message = JSON.parse(message);
} catch(e) {}
broadcastToChannel(channel, message);
});
// 廣播消息到指定頻道
function broadcastToChannel(channel, message, isError) {
let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
for(let i = 0, l = channelHandlers.length; i < l; i++) {
let handler = channelHandlers[i] || {};
let isOnce = handler.once || false;
let func = handler.cb;
let errorFunc = handler.errorCb;
_.isFunction(func) && func(message);
isError && _.isFunction(errorFunc) && errorFunc(message);
isOnce && channelHandlers.splice(i, 1); // 移除一次性監(jiān)聽的函數(shù)
}
}
function broadcastToAllChannels(message, isError) {
for(let channel in SubListenerFuns) {
broadcastToChannel(channel, message, isError);
}
}
function onError(err) {
err = err || {};
err.msg = err.msg || 'redis sub/pub fail';
// 通知所有channel執(zhí)行錯(cuò)誤回調(diào)函數(shù)
broadcastToAllChannels(err, true);
}
module.exports = RedisMQProxy;
在使用時(shí)就可以比較方便地調(diào)用了:
const RedisMQProxy = require('./RedisMQProxy'),
PAY_MQ_CHANNEL = 'pay_mq_channel';
// 訂閱channel
RedisMQ.on(PAY_MQ_CHANNEL, function(message) {
console.log('notification received:', message);
// 廣播消息到指定socket
// ...
});
// 訂閱一次性的channel
RedisMQ.once(PAY_MQ_CHANNEL, function(message) {
// ...
});
// 當(dāng)接收到支付數(shù)據(jù)時(shí),推送頻道消息
RedisMQ.emit(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});
目前該項(xiàng)目已經(jīng)健康運(yùn)行了一個(gè)多月。由于socket server的多進(jìn)程間消息推送依賴于redis的消息中轉(zhuǎn),而Redis使用的是單進(jìn)程,未能充分利用CPU。當(dāng)業(yè)務(wù)膨脹的時(shí)候,redis就要考慮分布集群了。
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
Node.js開發(fā)者必須了解的4個(gè)JS要點(diǎn)
這篇文章主要介紹了Node.js開發(fā)者必須了解的4個(gè)JS要點(diǎn),Node.js是一個(gè)面向服務(wù)器的框架,立足于Chrome強(qiáng)大的V8 JS引擎。盡管它由C++編寫而成,但是它及其應(yīng)用是運(yùn)行在JS上的,需要的朋友可以參考下2016-02-02
nodejs做個(gè)爬蟲爬取騰訊動(dòng)漫內(nèi)容簡(jiǎn)單實(shí)現(xiàn)
這篇文章主要為大家介紹了nodejs做個(gè)爬蟲爬取騰訊動(dòng)漫內(nèi)容簡(jiǎn)單實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07
Ubuntu 16.04 64位中搭建Node.js開發(fā)環(huán)境教程
如果想要在Ubuntu 16.04上安裝Node.js的話,這篇文章對(duì)你來(lái)說(shuō)肯定很重要。Node.js從本質(zhì)上來(lái)說(shuō)就是一個(gè)運(yùn)行在服務(wù)端上的封裝好了輸入輸出流的javascript程序。本文給大家詳細(xì)介紹了在Ubuntu 16.04 64位搭建Node.js開發(fā)環(huán)境的步驟,有需要的朋友們可以參考學(xué)習(xí)。2016-10-10
VS?Code擴(kuò)展Code?Runner?MCP?Server來(lái)了
韓老師介紹MCP協(xié)議和其在AI領(lǐng)域的重要性,并分享了自己開發(fā)的CodeRunnerMCPServer和YoemanGenerator?for?MCP,這些工具可以幫助開發(fā)者更方便地使用MCP協(xié)議2025-05-05
node連接MySQL數(shù)據(jù)庫(kù)的3種方式總結(jié)
現(xiàn)在前端基本上都會(huì)用一些NodeJs,想必也想自己寫一些API或者個(gè)人博客的后臺(tái)系統(tǒng),這些就離不開連接數(shù)據(jù)庫(kù)的問(wèn)題,下面這篇文章主要給大家介紹了關(guān)于node連接MySQL數(shù)據(jù)庫(kù)的3種方式,需要的朋友可以參考下2022-08-08
nodejs命令行參數(shù)處理模塊commander使用實(shí)例
這篇文章主要介紹了nodejs命令行參數(shù)處理模塊commander使用實(shí)例,commander是一個(gè)非常高大上的令行參數(shù)處理模塊,需要的朋友可以參考下2014-09-09
nodeJs項(xiàng)目在阿里云的簡(jiǎn)單部署
這篇文章主要為大家詳細(xì)介紹了nodeJs項(xiàng)目在阿里云的簡(jiǎn)單部署,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-11-11

