我刚刚创建了一个简单的可读可写流对,可以使用 pipeline() 连接。我对创建反压并控制 Readable 中读取发生的速率感兴趣。然而,我对如何实际实现这个或者是否可以使用 Node.js 流有点困惑。 举个例子:
const {Writable, Readable} = require('stream');
function getWritable() {
return new Writable({
write: function (chunk, encoding, cb) {
console.log(' => chunk => ', String(chunk));
setTimeout(cb, 1500);
}
});
}
function getReadable(data) {
return new Readable({
encoding: 'utf8',
objectMode: false,
read: function (n) {
// n => Number(16384)
console.log('read is called');
const d = data.shift();
this.push(d ? String(d) : null);
}
});
}
const readableStrm = getReadable([1, 2, 3, 4, 5]);
const piped = readableStrm.pipe(getWritable());
piped.on('finish', function () {
console.log('finish');
});
如果运行上面的代码,我们将看到“调用 read”将被记录 5 次,远早于 Writable 中的 write 方法看到数据。
我想做的是仅当 Writable 中的 write 方法触发其回调时才调用 Readable 中的 read()
;当然,read() 方法必须首先触发一次,但随后会等待可写对象准备就绪。
有没有办法控制 read()
方法何时以某种方式在可读中触发?
最终,我真的不明白read()
方法的目的是什么。
举一个简单的例子,无论我从 read() 返回什么,我都无法让它停止读取。 read 方法的意义是什么?为什么我们必须实现它?
const Readable = require('stream').Readable;
const r = new Readable({
objectMode: true,
read: function (n) {
console.log('is read');
return false/null/true; // nothing I return here makes a difference
}
});
r.on('data', function (d) {
console.log(d);
});
setInterval(function(){
r.push('valid');
},1000);
最佳答案
Node.js 流非常强大,可以对缓冲中的缓冲和通过它们的数据流提供大量控制。
现在回答您的问题:
A) 您会看到所有数据首先被读取,然后写入被触发,因为您的数据流非常小。如果您使用千字节数据进行测试,您将看到读取流和写入流按顺序触发。该顺序取决于读取流的缓冲容量和写入流创建的背压。例如,TCP 套接字读取流将比磁盘文件写入流快得多,从而产生背压。
B) 用于读写流的一个强大的构造函数选项是 highWaterMark
。您可以在 documentation 的缓冲部分阅读更多内容。 。 highWaterMark
专门定义了可读/可写流的缓冲能力。默认值为 16kb。在上面的示例中,您可以将 highWaterMark 设置为 2 个字节的可读流,如下所示,您将看到差异(在实际情况中不需要,但您可以用于学习)。
function getReadable(data) {
let i = 0;
return new stream.Readable({
highWaterWark: 2, // <--- highWaterMark set to 2 byte. Preferably set it to 10 and increase the length of your input array.
encoding: 'utf8',
objectMode: false,
read: function (n) {
// n => Number(16384)
console.log('read is called');
const d = data[i++];
this.push(d ? String(d) : null);
}
});
}
较小的 highWaterMark
值很快就会产生背压,并且对于读取网络数据等某些用例可能会不利。
C) 您还可以控制读取流和写入流中的数据流。如果您的应用程序需要它,那么您可以控制可写流中的可读流。具体方法readable.pause()
, readable.read([size])
, readable.resume()
, readable.push(chunk[, encoding])
和 readable.unpipe([destination])
允许您控制可读流中的数据缓冲和流(甚至来自可写流)。事实上,您甚至可以使用方法readable.unshift(chunk)
将数据从可写流推回到可读流。 。 有类似的方法可以控制可写流中的数据。
D) read
和 write
方法是流实现的一部分。这些方法用于将流数据发送到底层资源,不应直接从应用程序数据中调用。基本上,它定义了流的设置。 (不确定我是否能够清楚地解释这一点)。
我强烈建议您阅读 Node.js documentation on streams 。它将为您提供大量信息(比您从其他各种网站上找到的示例代码获得的信息还要多)。
希望以上信息对您有所帮助。
关于javascript - 在 Node.js 流中全程实现背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41648504/