node.js - NodeJS - 响应流

标签 node.js sockets sails.js rxjs

我使用 Sails.js 通过 NodeJS 构建了一个简单的 API 端点。

当有人访问我的 API 端点时,服务器开始等待数据,每当出现新数据时,他就会使用套接字广播它。每个客户端都应该根据其用户输入接收自己的数据流。

var Cap = require('cap').Cap;

collect: function (req, res) {

var iface = req.param("ip");

var c = new Cap(),
device = Cap.findDevice(ip);

    c.on('data', function(myData) {
        sails.sockets.blast('message', {"host": myData});
    });
});

响应未完成(我从不发送 res.json() - 实际发生的是浏览器继续加载 - 但上述功能有效)。

2 个问题:

  • 我正在尝试从我的客户端订阅和取消订阅此 API 端点(使用 RxJS)。当我订阅时,我开始通过套接字接收数据 - 但我无法取消订阅 API 端点(浏览器期望请求完成)。

  • 每个客户端应根据请求 IP 参数订阅自己的套接字空间(请参阅更新的代码)。目前,它向所有人发布了该消息。

如何使用 Sails.js 创建一个类似流/服务的 API 端点,该端点将根据每个用户的输入向其发送新数据?

我的目标是能够从每个客户端订阅/取消订阅此 API 端点。

最佳答案

修改后的答案

假设您的 API 端点在 config/routes.js 中定义如下:

...
'get     /collect': 'SomeController.collectSubscribe',
'delete  /collect': 'SomeController.collectUnsubscribe',

由于每个 Cap 实例都绑定(bind)到一台设备,因此我们需要为每个订阅一个实例。我们不使用 sails join/leave 方法,而是跟踪内存中的 Cap 实例,然后广播到请求套接字的 id。这是可行的,因为 Sails 套接字默认订阅它们自己的 id。

api/controllers/SomeController.js中:

// In order for the `Cap` instances to persist after `collectSubscribe` finishes, we store them all in an Object, associated with which socket the were created for.
var caps = {/* req.socket.id: <instance of Cap>, */};

module.exports = {

...

  collectSubscribe: function(req, res) {
    if (!res.isSocket) return res.badRequest("I need a websocket! Help!");
    if (!!caps[req.socket.id]) return res.badRequest("Dude, you are already subscribed.");

    caps[req.socket.id] = new Cap();
    var c = caps[req.socket.id]; // remember that `c` is a reference to our new `Cap`, not a copy.
    var device = c.findDevice(req.param('ip'));

    c.open(device, ...);
    c.on('data', function(myData) {
      sails.sockets.broadcast(req.socket.id, 'message', {host: myData});
    });

    return res.ok();
  },

  collectUnsubscribe: function(req, res) {
    if (!res.isSocket) return res.badRequest("I need a websocket! Help!");
    if (!caps[req.socket.id]) return res.badRequest("I can't unsubscribe you unless you actually subscribe first.");

    caps[req.socket.id].removeAllListeners('data');
    delete caps[req.socket.id]; 

    return res.ok();
  }
}

基本上,它是这样的:当浏览器请求触发 collectSubscribe 时,一个新的 Cap 实例会监听提供的 IP。当浏览器触发 collectUnsubscribe 时,服务器会检索该 Cap 实例,告诉它停止监听,然后将其删除。

生产注意事项:请注意,Cap列表不是持久的(因为它存储在内存中而不是数据库中)!因此,如果您的服务器关闭并重新启动(由于雷暴等),列表将被清除,但考虑到所有 websocket 连接无论如何都会被删除,我认为没有必要担心这一点。

旧答案,仅供引用

您可以使用 sails.sockets.join(req, room)sails.sockets.leave(req, room) 来管理套接字房间。本质上,您有一个名为“collect”的房间,只有加入该房间的套接字才会收到 sails.sockets.broadcast(room, eventName, data)

有关如何使用 sails.sockets 的更多信息 here .

api/controllers/SomeController.js中:

collectSubscribe: function(req, res) {
  if (!res.isSocket) return res.badRequest();

  sails.sockets.join(req, 'collect');
  return res.ok();

},

collectUnsubscribe: function(req, res) {
  if (!res.isSocket) return res.badRequest();

  sails.sockets.leave(req, 'collect');
  return res.ok();
}

最后,我们需要告诉服务器向我们的'collect'房间广播消息。 请注意,这只需要发生一次,因此您可以在 config/ 目录下的文件中执行此操作。

对于这个例子,我将其放入config/sockets.js

module.exports = {
  // ...
};


c.on('data', function(myData) {
  var eventName = 'message';
  var data = {host: myData};
  sails.sockets.broadcast('collect', eventName, data);
});

我假设可以在这里访问c;如果没有,您可以将其定义为 sails.c = ... 以使其可全局访问。

关于node.js - NodeJS - 响应流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45661974/

相关文章:

python - 使用 socket.send(packet, (ip, port)) 时,socket.error errno=10022

python - 如何在 python 程序中关闭 Ctrl-C 上的套接字连接

Javascript DateTime 与 MySQL 日期和时区

sails.js - 什么是 'info: transport end (undefined)' ?在 Sails.js 中

node.js - 使用 "npm publish"失败

javascript - 使用Node.js,如何查看域名是否被注册?

javascript - 将三元运算符转换为 if/else 语句

node.js - 如何强制node.js pm2基于ipv4运行

java - 远程主机强行关闭连接 - C# 客户端、Java 服务器

json - sails .js : compression doesn’t seem to work on json