node.js - 使用 Streams2 创建一个通过子进程传递数据的流

标签 node.js

假设我有一个函数BlackBox。 api 是这样的(其中 | 实际上是管道):

inputStream | BlackBox | outputStream

但是,BlackBox 实际上是 require('child_process').spawn 的包装器,所以它实际上看起来像这样:

inputStream | BlackBox.Writable -> proc.stdin -> proc.stdout -> BlackBox.Readable | outputStream

我可以使用streams1轻松做到这一点,但我想了解streams2以及它如何更好。因此,到目前为止我有以下代码:

var Duplex = require('stream').Duplex
var spawn = require('child_process').spawn
var util = require('util')

util.inherits(BlackBox, Duplex)

function BlackBox () {
  Duplex.call(this)

  // Example process
  this.proc = spawn('convert', ['-', ':-'])

  var that = this
  this.proc.stdout.on('end', function () {
    that.push(null)
  })
}

BlackBox.prototype._write = function (chunk, encoding, callback) {
  return this.proc.stdin.write(chunk, encoding, callback)
}

BlackBox.prototype.end = function (chunk, encoding, callback) {
  return this.proc.stdin.end(chunk, encoding, callback)
}

BlackBox.prototype._read = function (size) {
  var that = this

  this.proc.stdout.on('readable', function () {
    var chunk = this.read(size)
    if (chunk === null)
      that.push('')
    else
      that.push(chunk)
  })
}

我在这里做错了什么吗?

我主要关心的是关于read._read(size)文档的以下摘录:

When data is available, put it into the read queue by calling readable.push(chunk). If push returns false, then you should stop reading. When _read is called again, you should start pushing more data.

如何“停止阅读”?

要明确的是,我希望处理背压和节流。

最佳答案

isaacs 基本上做了一个例子:https://github.com/isaacs/duplex-passthrough

关于node.js - 使用 Streams2 创建一个通过子进程传递数据的流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17420984/

相关文章:

node.js - Mongoose 中的 Model.findOne() 和 Model.findById() 有什么区别?

javascript - 如何在nodejs模块中访问私有(private)代码?

node.js - Buffer.byteLength() 被窃听

node.js - 是否可以在 winston 日志消息中使用级别值而不是名称?

node.js - 我如何使用 Loopback.io 设置 SMTP

javascript - 这样可以用角色验证用户吗?

node.js - 在 openshift 上部署时的 Mongodb 和 Loopback

javascript - 如何在不重新启动应用程序的情况下清除所有用户的快速 session ?

javascript - 从对象数组中删除重复项

javascript - MongoDB 有问题的 JSON prop/field