javascript - 在 Node.js 流中全程实现背压

标签 javascript node.js

我刚刚创建了一个简单的可读可写流对,可以使用 pipeline() 连接。我对创建反压并控制 Readable 中读取发生的速率感兴趣。然而,我对如何实际实现这个或者是否可以使用 Node.js 流有点困惑。 举个例子:

   const {Writable, Readable} = require('stream');

    function getWritable() {
        return new Writable({
            write: function (chunk, encoding, cb) {
                console.log(' => chunk => ', String(chunk));
                setTimeout(cb, 1500);
            }
        });
    }


    function getReadable(data) {
        return new Readable({
            encoding: 'utf8',
            objectMode: false,
            read: function (n) {
                // n =>  Number(16384)
                console.log('read is called');
                const d = data.shift();
                this.push(d ? String(d) : null);
            }
        });
    }


    const readableStrm = getReadable([1, 2, 3, 4, 5]);

    const piped = readableStrm.pipe(getWritable());

    piped.on('finish', function () {
        console.log('finish');
    });

如果运行上面的代码,我们将看到“调用 read”将被记录 5 次,远早于 Writable 中的 write 方法看到数据。

我想做的是仅当 Writable 中的 write 方法触发其回调时才调用 Readable 中的 read() ;当然,read() 方法必须首先触发一次,但随后会等待可写对象准备就绪。

有没有办法控制 read() 方法何时以某种方式在可读中触发?

最终,我真的不明白read()方法的目的是什么。

举一个简单的例子,无论我从 read() 返回什么,我都无法让它停止读取。 read 方法的意义是什么?为什么我们必须实现它?

const Readable = require('stream').Readable;

const r = new Readable({
    objectMode: true,
    read: function (n) {
        console.log('is read');
        return false/null/true;  // nothing I return here makes a difference
    }
});


r.on('data', function (d) {
    console.log(d);
});


setInterval(function(){
    r.push('valid');
},1000);

最佳答案

Node.js 流非常强大,可以对缓冲中的缓冲和通过它们的数据流提供大量控制。

现在回答您的问题:

A) 您会看到所有数据首先被读取,然后写入被触发,因为您的数据流非常小。如果您使用千字节数据进行测试,您将看到读取流和写入流按顺序触发。该顺序取决于读取流的缓冲容量和写入流创建的背压。例如,TCP 套接字读取流将比磁盘文件写入流快得多,从而产生背压。

B) 用于读写流的一个强大的构造函数选项是 highWaterMark。您可以在 documentation缓冲部分阅读更多内容。 。 highWaterMark专门定义了可读/可写流的缓冲能力。默认值为 16kb。在上面的示例中,您可以将 highWaterMark 设置为 2 个字节的可读流,如下所示,您将看到差异(在实际情况中不需要,但您可以用于学习)。

function getReadable(data) {
    let i = 0;
    return new stream.Readable({
        highWaterWark: 2,  // <--- highWaterMark set to 2 byte. Preferably set it to 10 and increase the length of your input array.
        encoding: 'utf8',
        objectMode: false,
        read: function (n) {
            // n =>  Number(16384)
            console.log('read is called');
            const d = data[i++];
            this.push(d ? String(d) : null);
        }
    });
}

较小的 highWaterMark 值很快就会产生背压,并且对于读取网络数据等某些用例可能会不利。

C) 您还可以控制读取流和写入流中的数据流。如果您的应用程序需要它,那么您可以控制可写流中的可读流。具体方法readable.pause() , readable.read([size]) , readable.resume() , readable.push(chunk[, encoding])readable.unpipe([destination])允许您控制可读流中的数据缓冲和流(甚至来自可写流)。事实上,您甚至可以使用方法readable.unshift(chunk)将数据从可写流推回到可读流。 。 有类似的方法可以控制可写流中的数据

D) readwrite 方法是流实现的一部分。这些方法用于将流数据发送到底层资源,不应直接从应用程序数据中调用。基本上,它定义了流的设置。 (不确定我是否能够清楚地解释这一点)。

我强烈建议您阅读 Node.js documentation on streams 。它将为您提供大量信息(比您从其他各种网站上找到的示例代码获得的信息还要多)。

希望以上信息对您有所帮助。

关于javascript - 在 Node.js 流中全程实现背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41648504/

相关文章:

javascript - {app, BrowserWindow} 在 JavaScript (node.js) 中是什么意思?

javascript - 使用 javascript 在 Flash 中加载外部图像?

javascript - 使用字典过滤javascript数据对象

javascript - angularjs 中可能存在级联 View 吗?

javascript - 如何使用 iframe 和 css 打印网页

javascript - 如何关闭 Mongoose 中的 strictpopulate?

node.js - 包括模型,除非属性为空 - Node Sequelize

javascript - 为什么在 javascript 中使用基于类的 OOP 样式继承?

javascript - Visual Studio 2019 中用于 Node js 应用程序的 Mocha 在引用或需要另一个文件时未在测试资源管理器中显示测试

javascript - GraphQL "Cannot return null for non-nullable"