node.js - Stream.Transform 在完成输出之前接受新输入

标签 node.js stream

我正在使用异步操作实现转换流。我的叫做Parser

var Transform = require('stream').transform;

function Parser(options) {
  Transform.call(this, {objectMode: true});
}

Parser.prototype._transform = function _transform(input, encoding, callback) {
  var this_ = this;
  doSomethingAsync(input, function(output) {
    this_.push(output);
    //possible location #1 for callback();
  });
  //possible location #2 for callback();
}

每个传入 block 可能需要很长时间才能处理(doSomethingAsync 需要网络请求)。然而,每个 block 的处理完全独立于之前的 block 。此外,输出的确切顺序并不重要。每个输出都包含一个描述符,用于标识其输入,而不是按顺序标识。

因此,我希望尽快再次调用_transform,而不是等到给定的 block 完全完成处理。因此,查看代码,如果我将 callback() 放在 可能的位置 #1 中,那么在每个 block 完全完成之前,永远不会调用 _transform处理。但如果我把它放在可能的位置#2,那么我的流会在回调之后推送,这会导致这些难看的结果

Uncaught Error: stream.push() after EOF

流终止后出现错误。

所以我的问题:是否可以使用转换流来做到这一点?或者我应该考虑使用图书馆?如果是,哪种类型(事件流、FRP 等)?

谢谢。

最佳答案

您可以在流上实现 _flush(),并且仅在所有异步函数完成时调用传递给该函数的回调。像这样的事情:

function Parser(options) {
  Transform.call(this, {objectMode: true});

  this._pending = 0;
  this._flushcb = undefined;
}

Parser.prototype._transform = function _transform(input, encoding, callback) {
  var self = this;

  ++this._pending;

  doSomethingAsync(input, function(output) {
    self.push(output);
    if (--self._pending === 0 && self._flushcb)
      self._flushcb();
  });

  callback();
}

Parser.prototype._flush = function(callback) {
  this._flushcb = callback;
};

关于node.js - Stream.Transform 在完成输出之前接受新输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27822067/

相关文章:

javascript - 根据 Node 版本将标志传递给 Node.js 的最佳方法?

javascript - Socket.io 不触发从客户端到服务器的事件

http - Elixir:使用流 API 的 HTTPResponseStream

Java - 将 Web 服务响应 (StreamResult) 转换为 XML 以检索子值

node.js - 使用 DynamoDB 批量更新插入

node.js - 如何在不创建模型的情况下插入对模型的引用?

windows - Yeoman 命令无法在 Windows 7 中运行

java - 在流上应用正则表达式来提取字符串

捕获 stdin、stdout、stderr 上的错误

php - 无法在 simplexml_load_file() 中打开流错误