node.js - 将数据管道传输到尚未准备好接收数据的可写流

标签 node.js node-streams

有没有办法在 Node.js 中将可读流连接到可写流,而可写流还没有准备好接收数据?换句话说,我想将可读与可写联系起来,但我想在程序的稍后时间初始化可写,包括定义写入方法。也许我们必须实现 write 方法,但是有没有一种方法可以像暂停可读流那样暂停可写流?或者,也许我们可以使用一个中间直通/转换流并在那里缓冲数据,然后再将数据通过管道传输到可写对象!

举个例子,通常我们会这样做:

readable.pipe(transform).pipe(writable);

但我想做类似的事情:

const tstrm = readable.pipe(transform);

doSomethingAsync().then(function(){

      tstrm.pipe(writable);

});

只是想知道这是否可能以及如何正确地做到这一点,到目前为止还无法弄清楚这两个问题。

我想我希望在中间转换流中缓冲数据,然后再将其连接/传输到可写流,然后,一旦连接,在任何新数据之前首先流式传输缓冲数据。似乎是一件合理的事情,找不到任何相关信息。

最佳答案

请注意,我在这里使用间隔来模拟作者是否能够阅读。您可以按照您想要的任何方式执行此操作,即如果编写器返回 false,您将更新状态以开始缓冲等。我认为最后一行是您想要的,即

r.pipe(b).pipe(w);

内容如下

readStrem.pipe(transformBbuffer).pipe(writeStream);

示例代码,我们可以做一些更改来缓冲所有数据。我将在代码后描述。您需要了解的有关流的所有信息以及更多内容都在文档中,我认为他们可以用更完整的示例来做,但它们已经相当不错了……

https://nodejs.org/api/stream.html#stream_class_stream_transform_1

这是代码。

var fs     = require('fs');
var stream = require('stream')
const util = require('util');
//const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
var check_buff  = 0;
var DRAIN_ME    = 0;

var r = fs.createReadStream('file1.txt').setEncoding('utf8');
var w = fs.createWriteStream('file2.txt');

var BufferStream = function () {
  stream.Transform.apply(this, arguments);
  this.buffer = []; 
};

util.inherits(BufferStream, stream.Transform);

var intId;
intId = setInterval(function(){
  if(check_buff % 3 == 0) {
    DRAIN_ME = 1;
    return;
  }
  DRAIN_ME = 0;
},10);  

BufferStream.prototype._transform = function (chunk, encoding, done) {
  this.buffer.push(String(chunk));
  while(DRAIN_ME > 0 && this.buffer.length > 0) {
    this.push(this.buffer.shift());
  }
  console.log(chunk.length);
  console.log(this.buffer.length);
  done();
};

var b = new BufferStream();
b.on('end', function(chunk) {
  clearInterval(intId);
});
r.pipe(b).pipe(w);

I am looking for the canonical way to implement a transform/through stream, that buffers all data until pipe is call on it.

进行如下修改

BufferStream.prototype._transform = function (chunk, encoding, done) {
  this.buffer.push(String(chunk));

  console.log(chunk.length);
  console.log(this.buffer.length);
  done();
};
......
BufferStream.prototype._flush = function (cb) {
  var len = this.buffer.length;
  for (var i = 0; i < len; i++) {
    this.push(this.buffer.shift());
  };
  cb();
};

您还可以暂停可读流,这实际上会暂停可写流,因为它停止接收数据,即...

要对此进行测试,请在磁盘上创建一个相当大的文件,即 100MB 或更大,然后运行此...

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  var ready = 0;
  readableStream.pause();
  setInterval(function(){
    if(ready == 0) {
      //console.log('pausing');
      readableStream.pause();
      ready = 1;
    }   
    else {
      //console.log('resuming');
      readableStream.resume();
      ready = 0;
    }   
  },100);  
  writableStream.write(chunk);
});

立即暂停的原因是因为当间隔触发 10ms 时,文件可能已经被写入。这个有变化,即...

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');

var ready = 0;
setInterval(function(){
  if(ready == 0) {
    //console.log('pausing');
    readableStream.pause();
    ready = 1;
  }
  else {
    //console.log('resuming');
    readableStream.resume();
    ready = 0;
  }
},100);  

readableStream.on('data', function(chunk) {
  writableStream.write(chunk);
  readableStream.pause();
});

关于node.js - 将数据管道传输到尚未准备好接收数据的可写流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36564892/

相关文章:

javascript - 如何在 puppeteer 中执行 __doPostBack 函数?

javascript - 如何将事件发送到 gulp-tap 的父流中

node.js - 使用 Node js 进行 MP3 文件直播

node.js - 为什么我的第一个查询很慢但很快?

javascript - 如何在循环中保存对象的 'previous' 和 'next' 迭代?

node.js - GeoNear 与 Mongoose

javascript - 尝试停止流 : TypeError: _stream. abort 不是一个函数?

node.js - 从文件中序列化 6 个导入模型

node.js - 如何从字符串创建文件并通过 FormData 发送它

node.js - 通过stream.end()后重新打开写入流