node.js中TCP Socket多进程间的消息推送示例详解

前言

前段时间接到了一个支付中转服务的需求,即支付数据通过http接口传到中转服务器,中转服务器将支付数据发送到异构后台(Lua)的指定tcp socket。

一开始评估的时候感觉蛮简单的,就是http server和tcp server间的通信,不是一个Event实例就能解决的状态管理问题吗?注册一个事件A用于消息传递,在socket连接时注册唯一的ID,然后在http接收到数据时,emit事件A;在监听到事件A时,在tcp server中寻找指定ID对应的socket处理该数据即可。

尽管node.js在高并发方面有不错的性能,但是单个tcp server实例的承载能力有限,为避免服务器过载,node.js 单进程的内存有上限(默认2G),能容纳的长连接客户端数不多。但随着业务的扩大,我们需要考虑多机集群部署,客户端可以连接到任一节点,并发送消息。如何做到多节点的同时推送,我们需要建立一套多节点之间的消息分发/订阅架构。常用的第三方消息管理库有 RabbitMQ和Redis等。在这里,我用的是Redis的订阅发布服务。

redis.io有一个比较成熟的redis消息中转库socket.io-redis (本地下载)。但我们项目中异构后台用到的并非websocket,而是原生的TCP原生的Socket。用原生redis的sub/pubs实现并不难,就手写了。

redis在该项目中主要起到一个消息分发中心(publish/subscribe)的作用。当http请求的支付数据发送过来时,则通过redis的publish功能往所有的channel推送消息,这样所有订阅该channel的socket server就能收到回调,然后推送到指定客户端。在应用层看跟Event事件消息的处理差不多。

const redis = require("redis"),
 redisClient = redis.createClient,
 REDIS_CFG = {
  host: '127.0.0.1',
  port: 6379
 },
 sub = redisClient(REDIS_CFG),
 pub = redisClient(REDIS_CFG),
 PAY_MQ_CHANNEL = 'pay_mq_channel';

// 监听频道的消息回调
sub.on('message', function(channel, message) {
 switch (channle){
  case PAY_MQ_CHANNEL:
   console.log('notification received:', message);

   // 广播消息到指定socket

   break;
 }
});
// 订阅频道
sub.subscribe(PAY_MQ_CHANNEL);

// 当接收到支付数据时,推送频道消息
pub.publish(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});

由于redis的sub/pub的channel订阅数有上限,所以建议一类消息使用一个channel,一个channel下使用map、set或数组来存储订阅时的回调函数,在接收到订阅消息时遍历执行回调函数。

下面是我封装好的Redis组件(RedisMQProxy.js):

/*
 * redis 订阅/发布
 */
const _ = require('lodash'),
 redis = require("redis"),
 REDIS_CFG = {
  host: '127.0.0.1',
  port: 6379
 },
 sub = redisClient(REDIS_CFG),
 pub = redisClient(REDIS_CFG);

let SubListenerFuns = {}; // channel的回调函数列表

let RedisMQProxy = {

 // 订阅channel
 on(channel, cb, errorCb, once = false) {
  sub.subscribe(channel); // 订阅channel消息

  // 将回调函数存放数组中
  SubListenerFuns[channel] = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
  SubListenerFuns[channel].push({
   once, cb, errorCb
  });
 },

 // 监听一次性的channel回调函数
 once(channel, cb, errorCb) {
  this.on(channel, cb, errorCb, true);
 },

 // 发送channel消息
 emit(channel, message) {
  if(!_.isString(message)) {
   message = JSON.stringify(message);
  }
  pub.publish(channel, message);
 },

 // 移除channel上的监听函数
 removeListener(channel, func) {
  let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
  for(let i = 0, l = channelHandlers.length; i < l; i++) {
   let handler = channelHandlers[i] || {};
   let cb = handler.cb;
   if(func && func == cb) {
    channelHandlers.splice(i, 1);
    return false;
   }
  }
 }
};

RedisMQProxy.SubListeners = SubListenerFuns;

pub.on('error', onError);
sub.on('error', onError);

// 监听redis的订阅消息
sub.on("message", function(channel, message) {
 // 遍历执行channel的回调函数
 try {
  message = JSON.parse(message);
 } catch(e) {}
 broadcastToChannel(channel, message);
});

// 广播消息到指定频道
function broadcastToChannel(channel, message, isError) {
 let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
 for(let i = 0, l = channelHandlers.length; i < l; i++) {
  let handler = channelHandlers[i] || {};
  let isOnce = handler.once || false;
  let func = handler.cb;
  let errorFunc = handler.errorCb;

  _.isFunction(func) && func(message);
  isError && _.isFunction(errorFunc) && errorFunc(message);

  isOnce && channelHandlers.splice(i, 1); // 移除一次性监听的函数
 }
}

function broadcastToAllChannels(message, isError) {
 for(let channel in SubListenerFuns) {
  broadcastToChannel(channel, message, isError);
 }
}

function onError(err) {
 err = err || {};
 err.msg = err.msg || 'redis sub/pub fail';

 // 通知所有channel执行错误回调函数
 broadcastToAllChannels(err, true);
}

module.exports = RedisMQProxy;

在使用时就可以比较方便地调用了:

const RedisMQProxy = require('./RedisMQProxy'),
 PAY_MQ_CHANNEL = 'pay_mq_channel';

// 订阅channel
RedisMQ.on(PAY_MQ_CHANNEL, function(message) {
 console.log('notification received:', message);
 // 广播消息到指定socket
 // ...
});

// 订阅一次性的channel
RedisMQ.once(PAY_MQ_CHANNEL, function(message) {
 // ...
});

// 当接收到支付数据时,推送频道消息
RedisMQ.emit(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});

目前该项目已经健康运行了一个多月。由于socket server的多进程间消息推送依赖于redis的消息中转,而Redis使用的是单进程,未能充分利用CPU。当业务膨胀的时候,redis就要考虑分布集群了。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

时间: 2018-07-08

nodejs基础之多进程实例详解

本文实例讲述了nodejs基础之多进程.分享给大家供大家参考,具体如下: Node.js 多进程 我们都知道 Node.js 是以单线程的模式运行的,但它使用的是事件驱动来处理并发,这样有助于我们在多核 cpu 的系统上创建多个子进程,从而提高性能. 每个子进程总是带有三个流对象:child.stdin, child.stdout 和child.stderr.他们可能会共享父进程的 stdio 流,或者也可以是独立的被导流的流对象. Node 提供了 child_process 模块来创建子进程

Node.js中child_process实现多进程

复制代码 代码如下: var http = require('http'); function fib (n) {     if (n < 2) {         return 1;     } else {         return fib(n - 2) + fib(n - 1);     } } var server = http.createServer(function (req, res) {     var num = parseInt(req.url.substring(1)

深入理解NodeJS 多进程和集群

进程和线程 "进程" 是计算机系统进行资源分配和调度的基本单位,我们可以理解为计算机每开启一个任务就会创建至少一个进程来处理,有时会创建多个,如 Chrome 浏览器的选项卡,其目的是为了防止一个进程挂掉而应用停止工作,而 "线程" 是程序执行流的最小单元,NodeJS 默认是单进程.单线程的,我们将这个进程称为主进程,也可以通过 child_process 模块创建子进程实现多进程,我们称这些子进程为 "工作进程",并且归主进程管理,进程之间默

node.js使用cluster实现多进程

首先郑重声明: nodeJS 是一门单线程!异步!非阻塞语言! nodeJS 是一门单线程!异步!非阻塞语言! nodeJS 是一门单线程!异步!非阻塞语言! 重要的事情说3遍. 因为nodeJS天生自带buff, 所以从一出生就受到 万千 粉丝的追捧(俺,也是它的死忠). 但是,傻逼php 竟然嘲笑 我大NodeJS 的性能. 说不稳定,不可靠,只能利用单核CPU. 辣鸡 nodeJS. 艹!艹!艹! 搞mo shi~ 但,大哥就是大哥,nodeJS在v0.8 的时候就已经加入了cluster

Node.js中多进程模块Cluster的介绍与使用

前言 我们都知道nodejs最大的特点就是单进程.无阻塞运行,并且是异步事件驱动的.Nodejs的这些特性能够很好的解决一些问题,例如在服务器开发中,并发的请求处理是个大问题,阻塞式的函数会导致资源浪费和时间延迟.通过事件注册.异步函数,开发人员可以提高资源的利用率,性能也会改善.既然Node.js采用单进程.单线程模式,那么在如今多核硬件流行的环境中,单核性能出色的Nodejs如何利用多核CPU呢?创始人Ryan Dahl建议,运行多个Nodejs进程,利用某些通信机制来协调各项任务.目前,已

Nodejs中解决cluster模块的多进程如何共享数据问题

前述 nodejs在v0.6.x之后增加了一个模块cluster用于实现多进程,利用child_process模块来创建和管理进程,增加程序在多核CPU机器上的性能表现.本文将介绍利用cluster模块创建的多线程如何共享数据的问题. 进程间数据共享 首先举个简单的例子,代码如下: var cluster = require('cluster'); var data = 0;//这里定义数据不会被所有进程共享,各个进程有各自的内存区域 if (cluster.isMaster) { //主进程

nodejs中解决异步嵌套循环和循环嵌套异步的问题

众所周知,nodejs异步和循环对于初学者来说是一个很大的问题,今天我们就一起来了解和解决它 当异步和循环同时出现的时候这个问题就会被放大很多倍. 庆幸的是,大神们研究出了async这个第三方模块,解决了node中异步金字塔和循环问题 async这个模块应该是nodejs中使用最多的第三方模块,每个月下载量3000W+ async基础使用 : http://www.jb51.net/article/118526.htm 异步嵌套循环,循环嵌套异步的例子: var async = require(

Node.js中的cluster模块深入解读

预备知识 在如今机器的CPU都是多核的背景下,Node的单线程设计已经没法更充分的"压榨"机器性能了.所以从v0.8开始,Node新增了一个内置模块--"cluster",故名思议,它可以通过一个父进程管理一坨子进程的方式来实现集群的功能. 学习cluster之前,需要了解process相关的知识,如果不了解的话建议先阅读process模块.child_process模块. cluster借助child_process模块的fork()方法来创建子进程,通过fork

Nodejs中使用captchapng模块生成图片验证码

Nodejs项目,在做图片验证码的时候遇到了难题.Nodejs没有图片库,以后会有,但是现在没有. 网络上搜索一圈,有几个解决方案: 1.采用第三方验证码程序,有的时候,项目可能不允许: 2.使用Java或者PHP生成图片,Nodejs调用,中间采用Redies共享: 这两种方式都不太理想,好在终于找到了可以支持Nodejs图片验证码的一个库,虽然只支持数字,但是也还不错.原理是使用Base64的图片编码方式. 这个库的Gighub地址是:https://github.com/GeorgeCha

使用nodeJS中的fs模块对文件及目录进行读写,删除,追加,等操作详解

fs概述 文件 I/O 是由简单封装的标准 POSIX 函数提供的. nodeJS中通过 require('fs') 使用fs模块. 所有的方法都有异步和同步的形式. 异步形式始终以完成回调作为它最后一个参数. 传给完成回调的参数取决于具体方法,但第一个参数总是留给异常. 如果操作成功完成,则第一个参数会是 null 或 undefined //异步示例 var fs = require('fs'); fs.unlink('/tmp/hello', function(err){ if (err)

NodeJs中的VM模块详解

什么是VM? VM模块是NodeJS里面的核心模块,支撑了require方法和NodeJS的运行机制,我们有些时候可能也要用到VM模板来做一些特殊的事情. 通过VM,JS可以被编译后立即执行或者编译保存下来稍后执行(JavaScript code can be compiled and run immediately or compiled, saved, and run later.) VM模块包含了三个常用的方法,用于创建独立运行的沙箱体制,如下三个方法 vm.runInThisContex

详解Python中logging日志模块在多进程环境下的使用

前言 相信每位程序员应该都知道,在使用 Python 来写后台任务时,时常需要使用输出日志来记录程序运行的状态,并在发生错误时将错误的详细信息保存下来,以别调试和分析.Python 的 logging 模块就是这种情况下的好帮手. logging 模块可以指定日志的级别,DEBUG.INFO.WARNING.ERROR.CRITICAL,例如可以在开发和调试时,把 DEBUG 以上级别的日志都输出,而在生产环境下,只输出 INFO 级别.(如果不特别指定,默认级别是 warning) loggi

深入学习nodejs中的async模块的使用方法

最近在学习nodejs,这两天学习了async模块这个地方知识点挺多的,所以,今天添加一点小笔记. async模块是为了解决嵌套金字塔,和异步流程控制而生.常用的方法介绍 npm 安装好async模块,然后引入就可以使用 var async = require('async'); 1. series(tasks,[callback]) 多个函数从上到下依次执行,相互之间没有数据交互 var task1 =function(callback){ console.log("task1");

详解通过源码解析Node.js中cluster模块的主要功能实现

众所周知,Node.js中的JavaScript代码执行在单线程中,非常脆弱,一旦出现了未捕获的异常,那么整个应用就会崩溃.这在许多场景下,尤其是web应用中,是无法忍受的.通常的解决方案,便是使用Node.js中自带的cluster模块,以master-worker模式启动多个应用实例.然而大家在享受cluster模块带来的福祉的同时,不少人也开始好奇: 为什么我的应用代码中明明有app.listen(port);,但cluter模块在多次fork这份代码时,却没有报端口已被占用? Maste

Nodejs中使用phantom将html转为pdf或图片格式的方法

最近在项目中遇到需要把html页面转换为pdf的需求,并且转换成的pdf文件要保留原有html的样式和图片.也就是说,html页面的图片.表格.样式等都需要完整的保存下来. 最初找到三种方法来实现这个需求,这三种方法都只是粗浅的看了使用方法,从而找出适合这个需求的方案: html-pdf 模块 wkhtmltopdf 工具 phantom 模块 最终使用了phantom模块,也达到了预期效果.现在简单的记录三种方式的使用方法,以及三者之间主要的不同之处. 1.html-pdf github:ht