javascript - 将服务器发送的事件与 Express 一起使用

标签 javascript node.js express server-sent-events

我会尝试使其尽可能简单,这样我就不必发布大量代码。这是我的应用程序现在所做的:

  • 用户从浏览器上传音频文件
  • 该文件在我的服务器上进行处理,此过程需要一些时间,大约需要 8 个步骤才能完成。
  • 一切完成后,用户会在浏览器中收到流程已完成的反馈。

我想添加的是,在完成过程中的每个步骤之后,将一些数据发送回服务器。例如:“您的文件已上传”、“元数据已处理”、“图像已提取”等,以便用户获得有关正在发生的情况的增量反馈,我相信服务器发送事件可以帮助我做到这一点。

目前,文件通过 app.post('/api/track', upload.single('track'), audio.process) 发布到服务器。 audio.process 是所有魔法发生的地方,并使用 res.send() 将数据发送回浏览器。非常典型。

在尝试让事件正常工作时,我已经实现了此功能

app.get('/stream', function(req, res) {
  res.sseSetup()

  for (var i = 0; i < 5; i++) {
    res.sseSend({count: i})
  }
})

当用户从服务器上传文件时,我只需调用此路由并在客户端使用此函数注册所有必要的事件:

progress : () => {
if (!!window.EventSource) {
  const source = new EventSource('/stream')

  source.addEventListener('message', function(e) {
    let data = JSON.parse(e.data)
    console.log(e);
  }, false)

  source.addEventListener('open', function(e) {
    console.log("Connected to /stream");
  }, false)

  source.addEventListener('error', function(e) {
    if (e.target.readyState == EventSource.CLOSED) {
      console.log("Disconnected from /stream");
    } else if (e.target.readyState == EventSource.CONNECTING) {
      console.log('Connecting to /stream');
    }
  }, false)
} else {
  console.log("Your browser doesn't support SSE")
}
}

这按预期工作,当我上传轨道时,我会得到从 0-4 计数的事件流。太棒了!

我的问题:如何将相关消息从 audio.process 路由发送到 /stream 路由,以便消息可以与正在发生的事情相关。 audio.process 必须是 POST,并且 /stream 必须是带有 header 的 GET >“内容类型”:“文本/事件流”。从 audio.process 中发出 GET 请求似乎有点奇怪,但这是最好的方法吗?

任何和所有的建议/提示都值得赞赏!如果您需要更多信息,请告诉我。

最佳答案

新答案:

只要使用socket.io,它就更容易、更好! https://www.npmjs.com/package/socket.io#in-conjunction-with-express

基本设置:

const express = require('express');
const PORT = process.env.PORT || 5000;
const app = express();
const server = require('http').createServer(app);
const io = require('socket.io')(server);
// listen to socket connections
io.on('connection', function(socket){
  // get that socket and listen to events
  socket.on('chat message', function(msg){
    // emit data from the server
    io.emit('chat message', msg);
  });
});
// Tip: add the `io` reference to the request object through a middleware like so:
app.use(function(request, response, next){
  request.io = io;
  next();
});
server.listen(PORT);
console.log(`Listening on port ${PORT}...`);

并且在任何路由处理程序中,您可以使用socket.io:

app.post('/post/:post_id/like/:user_id', function likePost(request, response) {
  //...
  request.io.emit('action', 'user liked your post');
})

客户端:

<script src="/socket.io/socket.io.js"></script>
<script src="https://code.jquery.com/jquery-1.11.1.js"></script>
<script>
  $(function () {
    var socket = io();
    $('form').submit(function(e){
      e.preventDefault(); // prevents page reloading
      socket.emit('chat message', $('#m').val());
      $('#m').val('');
      return false;
    });
    socket.on('chat message', function(msg){
      $('#messages').append($('<li>').text(msg));
    });
  });
</script>

完整示例:https://socket.io/get-started/chat/

原始答案

有人(用户: https://stackoverflow.com/users/451634/benny-neugebauer | 来自这篇文章: addEventListener on custom object )字面上给了我一个提示,告诉我如何在没有除 Express 之外的任何其他包的情况下实现这一点!我已经成功了!

首先导入Node的EventEmitter:

const EventEmitter = require('events');

然后创建一个实例:

const Stream = new EventEmitter();

然后创建事件流的 GET 路由:

app.get('/stream', function(request, response){
  response.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });

  Stream.on("push", function(event, data) {
    response.write("event: " + String(event) + "\n" + "data: " + JSON.stringify(data) + "\n\n");
  });
});

在此 GET 路由中,您将写回请求为 200 OK、内容类型为文本/事件流、无缓存且保持事件状态。

您还将调用 EventEmitter 实例的 .on 方法,该方法采用 2 个参数:要监听的事件字符串和处理该事件的函数(该函数可以采用给定的参数数量) )

现在......发送服务器事件所需要做的就是调用 EventEmitter 实例的 .emit 方法:

Stream.emit("push", "test", { msg: "admit one" });

第一个参数是要触发的事件的字符串(请确保与 GET 路由中的事件相同)。 .emit 方法的每个后续参数都将传递给监听器的回调!

就是这样!

由于您的实例是在路由定义之上的范围内定义的,因此您可以从任何其他路由调用 .emit 方法:

app.get('/', function(request, response){
  Stream.emit("push", "test", { msg: "admit one" });
  response.render("welcome.html", {});
});

由于 JavaScript 作用域的工作原理,您甚至可以将该 EventEmitter 实例传递给其他函数,甚至可以从其他模块传递:

const someModule = require('./someModule');

app.get('/', function(request, response){
  someModule.someMethod(request, Stream)
  .then(obj => { return response.json({}) });
});

在某些模块中:

function someMethod(request, Stream) { 
  return new Promise((resolve, reject) => { 
    Stream.emit("push", "test", { data: 'some data' });
    return resolve();
  }) 
}

就这么简单!不需要其他包!

这里是 Node 的 EventEmitter 类的链接:https://nodejs.org/api/events.html#events_class_eventemitter

我的例子:

const EventEmitter = require('events');
const express = require('express');
const app = express();

const Stream = new EventEmitter(); // my event emitter instance

app.get('/stream', function(request, response){
  response.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });

  Stream.on("push", function(event, data) {
    response.write("event: " + String(event) + "\n" + "data: " + JSON.stringify(data) + "\n\n");
  });
});

setInterval(function(){
  Stream.emit("push", "test", { msg: "admit one" });
}, 10000)

更新:

我创建了一个更易于使用且不会导致内存泄漏的模块/文件!

const Stream = function() {
  var self = this;

  // object literal of connections; IP addresses as the key
  self.connections = {};

  self.enable = function() {
    return function(req, res, next) {
      res.sseSetup = function() {
        res.writeHead(200, {
          'Content-Type': 'text/event-stream',
          'Cache-Control': 'no-cache',
          'Connection': 'keep-alive'
        })
      }

      res.sseSend = function(id, event, data) {
        var stream = "id: " + String(id) + "\n" +
        "event: " + String(event) + "\n" +
        "data: " + JSON.stringify(data) +
        "\n\n";

        // console.log(id, event, data, stream);

        res.write(stream);
      }

      next()
    }
  }

  self.add = function(request, response) {
    response.sseSetup();
    var ip = String(request.ip);
    self.connections[ip] = response;
  }.bind(self);

  self.push_sse = function(id, type, obj) {
    Object.keys(self.connections).forEach(function(key){
      self.connections[key].sseSend(id, type, obj);
    });
  }.bind(self);

}

/*
  Usage:
  ---
  const express = require('express');
  const Stream = require('./express-eventstream');
  const app = express();
  const stream = new Stream();
  app.use(stream.enable());
  app.get('/stream', function(request, response) {
    stream.add(request, response);
    stream.push_sse(1, "opened", { msg: 'connection opened!' });
  });
  app.get('/test_route', function(request, response){
    stream.push_sse(2, "new_event", { event: true });
    return response.json({ msg: 'admit one' });
  });
*/

module.exports = Stream;

脚本位于此处 - https://github.com/ryanwaite28/script-store/blob/master/js/express-eventstream.js

关于javascript - 将服务器发送的事件与 Express 一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48131448/

相关文章:

javascript - 通过 Webpack 加载 FontAwesome : Fonts do not load

javascript - 在mongodb中查询小于今天的日期

node.js - ExpressJS/PassportJS : Authentication vs. session

javascript - 为什么 Angular 总是重定向到主页?

node.js - Express React cookie 问题

express - 用于生产缺失节点模块包的 Nrwl Nx 构建

javascript - 在 Angular2/TypeScript 中找不到模型的命名空间错误

javascript - 惯用的 Javascript 编码风格——何时在原型(prototype)上声明函数与在函数构造函数内声明函数

javascript - 如何使用 JavaScript 给元素 "animation-name"属性?

node.js - 如何使用express、sequelize、mocha和supertest进行回滚单元测试