node.js - Socket.io、集群、快速和同步事件

标签 node.js express socket.io cluster-computing

我有一个大问题 sice 1 周。我尝试将我在单核上实际运行的 node.JS 项目转换为带集群的多核。

使用 websockets,此时我对事件没有任何问题,但对于 xhr-polling 或 jsonp-polling,我在集群模式下的 socket.io 有很大问题。

这是我的服务器配置:

00-generic.js

'use strict';

var http            = require('http'),
    os              = require('os'),
    cluster         = require('cluster');

module.exports = function(done) {
    var app = this.express,
        port = process.env.PORT || 3000,
        address = '0.0.0.0';

    if(this.env == 'test'){
        port = 3030;
    }

    var self = this;
    var size = os.cpus().length;

    if (cluster.isMaster) {
        console.info('Creating HTTP server cluster with %d workers', size);

        for (var i = 0; i < size; ++i) {
            console.log('spawning worker process %d', (i + 1));
            cluster.fork();
        }

        cluster.on('fork', function(worker) {
            console.log('worker %s spawned', worker.id);
        });
        cluster.on('online', function(worker) {
            console.log('worker %s online', worker.id);
        });
        cluster.on('listening', function(worker, addr) {
            console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port);
        });
        cluster.on('disconnect', function(worker) {
            console.log('worker %s disconnected', worker.id);
        });
        cluster.on('exit', function(worker, code, signal) {
            console.log('worker %s died (%s)', worker.id, signal || code);
            if (!worker.suicide) {
                console.log('restarting worker');
                cluster.fork();
            }
        });
    } else {
        http.createServer(app).listen(port, address, function() {
            var addr = this.address();
            console.log('listening on %s:%d', addr.address, addr.port);
            self.server = this;
            done();
        });
    }
};

03-socket.io.js

"use strict";
var _               = require('underscore'),
    socketio        = require('socket.io'),
    locomotive      = require('locomotive'),
    RedisStore      = require("socket.io/lib/stores/redis"),
    redis           = require("socket.io/node_modules/redis"),
    v1              = require(__dirname + '/../app/socket.io/v1'),
    sockets         = require(__dirname + '/../../app/socket/socket'),
    config          = require(__dirname + '/../app/global'),
    cluster         = require('cluster');

module.exports = function () {
    if (!cluster.isMaster) {
        this.io = socketio.listen(this.server);

        var pub             = redis.createClient(),
            sub             = redis.createClient(),
            client          = redis.createClient();

        this.io.enable('browser client minification');  // send minified client
        this.io.enable('browser client etag');          // apply etag caching logic based on version number
        this.io.enable('browser client gzip');          // gzip the file

        this.io.set("store", new RedisStore({
            redisPub        : pub,
            redisSub        : sub,
            redisClient     : client
        }));
        this.io.set('log level', 2);
        this.io.set('transports', [
            'websocket',
            'jsonp-polling'
        ]);
        this.io.set('close timeout', 24*60*60);
        this.io.set('heartbeat timeout', 24*60*60);

        this.io.sockets.on('connection', function (socket) {
            console.log('connected with ' + this.io.transports[socket.id].name);

            // partie v1 @deprecated
            v1.events(socket);

            // partie v1.1 refaite
            _.each(sockets['1.1'], function(Mod) {
                var mod = new Mod();
                mod.launch({
                    socket  : socket,
                    io      : this.io
                });
            }, this);

        }.bind(this));
    }
};

通过轮询,客户端会不时在与启动的监听器不同的进程上连接。类似地,服务器与客户端的通信也带有发射。

经过一番搜索,我发现有必要通过一个 store for socket.io 来共享数据连接。因此,我按照文档中所示构建了 RedisStore socket.io,但即便如此,我发现自己的事件没有安全到达,我仍然收到以下错误消息:

warn: client not handshaken client should reconnect

编辑

现在,警告错误不会被调用。我将 redisStore 更改为 socket.io-clusterhub 但现在并不总是调用事件。有时,好像轮询请求被另一个工作人员捕获,而不是启动监听器的工作人员,所以什么也没有发生。这是新的配置:

'use strict';

var http            = require('http'),
    locomotive      = require('locomotive'),
    os              = require('os'),
    cluster         = require('cluster'),
    config          = require(__dirname + '/../app/global'),
    _               = require('underscore'),
    socketio        = require('socket.io'),
    v1              = require(__dirname + '/../app/socket.io/v1'),
    sockets         = require(__dirname + '/../../app/socket/socket');

module.exports = function(done) {
    var app = this.express,
        port = process.env.PORT || 3000,
        address = '0.0.0.0';

    if(this.env == 'test'){
        port = 3030;
    }

    var self = this;
    var size = os.cpus().length;

    this.clusterStore = new (require('socket.io-clusterhub'));

    if (cluster.isMaster) {
        for (var i = 0; i < size; ++i) {
            console.log('spawning worker process %d', (i + 1));
            cluster.fork();
        }

        cluster.on('fork', function(worker) {
            console.log('worker %s spawned', worker.id);
        });
        cluster.on('online', function(worker) {
            console.log('worker %s online', worker.id);
        });
        cluster.on('listening', function(worker, addr) {
            console.log('worker %s listening on %s:%d', worker.id, addr.address, addr.port);
        });
        cluster.on('disconnect', function(worker) {
            console.log('worker %s disconnected', worker.id);
        });
        cluster.on('exit', function(worker, code, signal) {
            console.log('worker %s died (%s)', worker.id, signal || code);
            if (!worker.suicide) {
                console.log('restarting worker');
                cluster.fork();
            }
        });
    } else {
        var server = http.createServer(app);

        this.io = socketio.listen(server);

        this.io.configure(function() {
            this.io.enable('browser client minification');  // send minified client
            this.io.enable('browser client etag');          // apply etag caching logic based on version number
            this.io.enable('browser client gzip');          // gzip the file

            this.io.set('store', this.clusterStore);
            this.io.set('log level', 2);
            this.io.set('transports', [
                'websocket',
                'jsonp-polling'
            ]);
            //this.io.set('close timeout', 24*60*60);
            //this.io.set('heartbeat timeout', 24*60*60);
        }.bind(this));

        this.io.sockets.on('connection', function (socket) {
            console.log('connected with ' + this.io.transports[socket.id].name);
            console.log('connected to worker: ' + cluster.worker.id);

            // partie v1 @deprecated
            v1.events(socket);

            // partie v1.1 refaite
            _.each(sockets['1.1'], function(Mod) {
                var mod = new Mod();
                mod.launch({
                    socket  : socket,
                    io      : this.io
                });
            }, this);

        }.bind(this));

        server.listen(port, address, function() {
            var addr = this.address();
            console.log('listening on %s:%d', addr.address, addr.port);
            self.server = this;
            done();
        });
    }
};

最佳答案

来自该来源:http://socket.io/docs/using-multiple-nodes/

If you plan to distribute the load of connections among different processes or machines, you have to make sure that requests associated with a particular session id connect to the process that originated them.

This is due to certain transports like XHR Polling or JSONP Polling relying on firing several requests during the lifetime of the “socket”.

每次都将连接路由到同一个工作人员:

粘性 session

在 socket.io 文档中,这是每次将请求路由到同一个工作人员的推荐方式。

https://github.com/indutny/sticky-session

A simple performant way to use socket.io with a cluster.

Socket.io is doing multiple requests to perform handshake and establish connection with a client. With a cluster those requests may arrive to different workers, which will break handshake protocol.

var sticky = require('sticky-sesion');

sticky(function() {
  // This code will be executed only in slave workers

  var http = require('http'),
      io = require('socket.io');

  var server = http.createServer(function(req, res) {
    // ....
  });
  io.listen(server);

  return server;
}).listen(3000, function() {
  console.log('server started on 3000 port');
});

在 Node 之间传递消息:

socket.io-redis

在 socket.io 文档中,这是在工作人员之间共享消息的推荐方式。

https://github.com/automattic/socket.io-redis

By running socket.io with the socket.io-redis adapter you can run multiple socket.io instances in different processes or servers that can all broadcast and emit events to and from each other.

socket.io-redis 就是这样使用的:

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

还有

我认为您没有使用 socket.io v1.0.0。您可能需要更新您的版本以获得更高的稳定性。

您可以在 http://socket.io/docs/migrating-from-0-9/ 查看他们的迁移指南

关于node.js - Socket.io、集群、快速和同步事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23886054/

相关文章:

javascript - socket.io 客户端在长 Node.js 函数中自动断开连接

javascript - sequelize 缓存常见请求

node.js - 获取 PDFKit 作为 base64 字符串

javascript - Node : Auto-catch errors in Promise and call reject

node.js - 让 SASS 从特定文件(自动)编译到不同的文件夹中

javascript - socket.io 未发送到客户端

node.js - npm install 停止工作

angularjs - ionic View 不刷新来自 API 的数据

node.js - Express Validator 的值无效

express - 使用express.js + Passport.js 进行 session