node.js中TCP Socket多进程间的消息推送示例详解

前言

前段时间接到了一个支付中转服务的需求,即支付数据通过http接口传到中转服务器,中转服务器将支付数据发送到异构后台(Lua)的指定tcp socket。

一开始评估的时候感觉蛮简单的,就是http server和tcp server间的通信,不是一个Event实例就能解决的状态管理问题吗?注册一个事件A用于消息传递,在socket连接时注册唯一的ID,然后在http接收到数据时,emit事件A;在监听到事件A时,在tcp server中寻找指定ID对应的socket处理该数据即可。

尽管node.js在高并发方面有不错的性能,但是单个tcp server实例的承载能力有限,为避免服务器过载,node.js 单进程的内存有上限(认2G),能容纳的长连接客户端数不多。但随着业务的扩大,我们需要考虑多机集群部署,客户端可以连接到任一节点,并发送消息。如何做到多节点的同时推送,我们需要建立一套多节点之间的消息分发/订阅架构。常用的第三方消息管理库有 RabbitMQ和Redis等。在这里,我用的是Redis订阅发布服务。

redis.io有一个比较成熟的redis消息中转库)。但我们项目中异构后台用到的并非websocket,而是原生的TCP原生的Socket。用原生redis的sub/pubs实现并不难,就手写了。

redis在该项目中主要起到一个消息分发中心(publish/subscribe)的作用。当http请求的支付数据发送过来时,则通过redis的publish功能往所有的channel推送消息,这样所有订阅该channel的socket server就能收到回调,然后推送到指定客户端。在应用层看跟Event事件消息的处理差不多。

rush:js;"> const redis = require("redis"),redisClient = redis.createClient,REdis_CFG = { host: '127.0.0.1',port: 6379 },sub = redisClient(REdis_CFG),pub = redisClient(REdis_CFG),PAY_MQ_CHANNEL = 'pay_mq_channel';

// 监听频道的消息回调
sub.on('message',function(channel,message) {
switch (channle){
case PAY_MQ_CHANNEL:
console.log('notification received:',message);

// 广播消息到指定socket

break;
}
});
// 订阅频道
sub.subscribe(PAY_MQ_CHANNEL);

// 当接收到支付数据时,推送频道消息
pub.publish(PAY_MQ_CHANNEL,{id: '01',msg: hello ${PAY_MQ_CHANNEL}!});

由于redis的sub/pub的channel订阅数有上限,所以建议一类消息使用一个channel,一个channel下使用map、set或数组来存储订阅时的回调函数,在接收到订阅消息时遍历执行回调函数

下面是我封装好的Redis组件(RedisMQProxy.js):

rush:js;"> /* * redis 订阅/发布 */ const _ = require('lodash'),redis = require("redis"),pub = redisClient(REdis_CFG);

let SubListenerFuns = {}; // channel的回调函数列表

let RedisMQProxy = {

// 订阅channel
on(channel,cb,errorCb,once = false) {
sub.subscribe(channel); // 订阅channel消息

// 将回调函数存放数组中
SubListenerFuns[channel] = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
SubListenerFuns[channel].push({
once,errorCb
});
},// 监听一次性的channel回调函数
once(channel,errorCb) {
this.on(channel,true);
},// 发送channel消息
emit(channel,message) {
if(!_.isstring(message)) {
message = JSON.stringify(message);
}
pub.publish(channel,message);
},// 移除channel上的监听函数
removeListener(channel,func) {
let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
for(let i = 0,l = channelHandlers.length; i < l; i++) {
let handler = channelHandlers[i] || {};
let cb = handler.cb;
if(func && func == cb) {
channelHandlers.splice(i,1);
return false;
}
}
}
};

RedisMQProxy.SubListeners = SubListenerFuns;

pub.on('error',onError);
sub.on('error',onError);

// 监听redis订阅消息
sub.on("message",message) {
// 遍历执行channel的回调函数
try {
message = JSON.parse(message);
} catch(e) {}
broadcasttochannel(channel,message);
});

// 广播消息到指定频道
function broadcasttochannel(channel,message,isError) {
let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
for(let i = 0,l = channelHandlers.length; i < l; i++) {
let handler = channelHandlers[i] || {};
let isOnce = handler.once || false;
let func = handler.cb;
let errorFunc = handler.errorCb;

.isFunction(func) && func(message);
isError &&
.isFunction(errorFunc) && errorFunc(message);

isOnce && channelHandlers.splice(i,1); // 移除一次性监听的函数
}
}

function broadcastToAllChannels(message,isError) {
for(let channel in SubListenerFuns) {
broadcasttochannel(channel,isError);
}
}

function onError(err) {
err = err || {};
err.msg = err.msg || 'redis sub/pub fail';

// 通知所有channel执行错误回调函数
broadcastToAllChannels(err,true);
}

module.exports = RedisMQProxy;

在使用时就可以比较方便地调用了:

rush:js;"> const RedisMQProxy = require('./RedisMQProxy'),PAY_MQ_CHANNEL = 'pay_mq_channel';

// 订阅channel
RedisMQ.on(PAY_MQ_CHANNEL,function(message) {
console.log('notification received:',message);
// 广播消息到指定socket
// ...
});

// 订阅一次性的channel
RedisMQ.once(PAY_MQ_CHANNEL,function(message) {
// ...
});

// 当接收到支付数据时,推送频道消息
RedisMQ.emit(PAY_MQ_CHANNEL,msg: hello ${PAY_MQ_CHANNEL}!});

目前该项目已经健康运行了一个多月。由于socket server的多进程间消息推送依赖于redis的消息中转,而Redis使用的是单进程,未能充分利用cpu。当业务膨胀的时候,redis就要考虑分布集群了。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对编程之家的支持

相关文章

这篇文章主要介绍“基于nodejs的ssh2怎么实现自动化部署”的...
本文小编为大家详细介绍“nodejs怎么实现目录不存在自动创建...
这篇“如何把nodejs数据传到前端”文章的知识点大部分人都不...
本文小编为大家详细介绍“nodejs如何实现定时删除文件”,内...
这篇文章主要讲解了“nodejs安装模块卡住不动怎么解决”,文...
今天小编给大家分享一下如何检测nodejs有没有安装成功的相关...