javascript - Node 流: Wait until data is available

标签 javascript node.js node-streams

我有 3 个流(A、B、C),它们通过管道输送到另一个流(A->B->C)。 当我启动程序时,B 的 _read get 会立即被调用,因为它通过管道传输到 C。但是,当 A 异步获取数据时,B 流中还没有数据。一旦 B 接收到传递给 B 的 _write 方法的数据,它就会转换数据并发出一个“可读”事件(我手动触发该事件 - 这是假定的执行方式吗?)。

但是什么也没有发生,并且来自 B 的数据没有被任何人消耗(因此 B 的 _read 没有被调用)。我可以通过在 _write() 方法末尾调用(在 B 上) this._read() 来解决此问题。但是,尽管队列已满,这也可能将数据推送给消费者,对吗?

基本上我想将较大的数据 block 发送到B流中,将其分割成较小的数据,然后将它们一一传递给C。所以我想在 B 中有某种缓冲区。

_read(size) {
    if(this._lineBuffer.length > 0) {
        var stop = false;
        while(!stop) {
            stop = this.push(this._lineBuffer.splice(0,1));
        }
    }
    if(this._pendingWriteAck) {
        this._pendingWriteAck();
        this._pendingWriteAck = null;
    }
}

_write(chunk, encoding, callback) {
    console.log("New chunk for line splitter received");
    if(this._buffer) {
        this._buffer = Buffer.concat([this._buffer, chunk]);
    } else {
        this._buffer = chunk;
    }
    for (; this._offset < this._buffer.length; this._offset++) {
        if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line
            this._lineBuffer.push(this._buffer.slice(0, this._offset).toString());
            this._buffer = this._buffer.slice(this._offset + 1);
            this._offset = 0;
        }
    }

    if(this._lineBuffer.length > this._maxLineBuffer || this._buffer.length > this._maxRawBuffer) {
        console.log("Line Split buffer has reached capacity. Waiting...");
        this._pendingWriteAck = callback;
    } else {
        callback();
    }

    setImmediate(()=>{
        this.emit('readable');
        this._read();
    })
}

最佳答案

您可以将转换流用于“B”流:

const Transform = require('stream').Transform;

const B = new Transform({
  transform(chunk, encoding, callback) {
    this._offset = this._offset || 0;
    this._buffer = this._buffer ? Buffer.concat([this._buffer, chunk]) : chunk
    for (; this._offset < this._buffer.length; this._offset++) {
      if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line
        if (this._offset) {
          this.push(this._buffer.slice(0, this._offset), encoding);
        }
        this._buffer = this._buffer.slice(this._offset + 1);
        this._offset = 0;
      }
    }
    callback()
  },
  flush(callback) {
    if (this._buffer && this._buffer.length) {
      this.push(this._buffer);
    }
    callback();
  }
});

您可以通过执行以下操作来查看它的工作情况:

let line = 0
B.on('data', (chunk) => process.stdout.write(`${++line}. ${chunk}\n`))
B.write(['Foo', 'Bar', 'Baz', 'Qux', 'Hello '].join('\n'))
B.write(['World', 'The End'].join('\n'))
B.end()

终端的输出将是:

1. Foo
2. Bar
3. Baz
4. Qux
5. Hello World
6. The End

关于javascript - Node 流: Wait until data is available,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43192126/

相关文章:

javascript - onClick() Javascript 函数无法被识别,ReferenceError : function is not defined, 使用文档 Ready

javascript - 将 Dropzone.js 与其他表单字段集成到 html 表单中

javascript - 如何在 NodeJS 中创建一对多转换流?

javascript - 发现 js php 加载/执行问题

Javascript - 创建可迭代抽象层

node.js - docker中服务器和数据库之间的查询非常慢

node.js - Next js 生产构建是否需要服务器中的 node_modules 文件夹?

Node.js:当子进程在对其标准输入流进行大量写入操作期间突然关闭时,会引发无法捕获的错误

node.js:在将流传输到最终目的地之前确定流的长度

javascript - 如何告诉屏幕阅读器阅读切换元素