node.js - 如何在 node.js 中实现正确处理背压的流?

标签 node.js stream backpressure

我一辈子都想不出如何实现 stream正确处理背压。你不应该使用暂停和恢复吗?

我有这个实现,我正在尝试使其正常工作:

var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
    stream.Readable.call(this, {highWaterMark: highWaterMark})
    this.stream = myStream

    myStream.on('readable', function() {
        var data = myStream.read(5000)
        //process.stdout.write("Eff: "+data)
        if(data !== null) {
            if(!this.push(data)) {
                process.stdout.write("Pause")
                this.pause()
            }
            callback(data)
        }
    }.bind(this))

    myStream.on('end', function() {
        this.push(null)
    }.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
    process.stdout.write("resume")
    //this.resume() // putting this in for some reason causes the stream to not output???
}

它正确地发送了输出,但没有正确地产生背压。如何更改它以正确支持背压?

最佳答案

好吧,经过大量的反复试验,我终于弄明白了。一些准则:

  • 永远不要使用暂停或恢复(否则它会进入传统的“流动”模式)
  • 永远不要添加“数据”事件监听器(否则它将进入传统的“流动”模式)
  • 实现者有责任跟踪源代码何时可读
  • 实现者有责任跟踪目的地何时需要更多数据
  • 在调用 _read 方法之前,实现不应读取任何数据
  • read 的参数告诉源给它那么多字节,最好将传递给 this._read 的参数传递给源的 read 方法。通过这种方式,您应该能够配置在目的地一次读取多少,并且流链的其余部分应该是自动的。

所以我把它改成了:

更新:我创建了一个 Readable,它在适当的背压下更容易实现,并且应该具有与 Node 的 native 流一样多的灵 active 。

var Readable = stream.Readable
var util = require('util')

// an easier Readable stream interface to implement
// requires that subclasses:
    // implement a _readSource function that
        // * gets the same parameter as Readable._read (size)
        // * should return either data to write, or null if the source doesn't have more data yet
    // call 'sourceHasData(hasData)' when the source starts or stops having data available
    // calls 'end()' when the source is out of data (forever)
var Stream666 = {}
Stream666.Readable = function() {
    stream.Readable.apply(this, arguments)
    if(this._readSource === undefined) {
        throw new Error("You must define a _readSource function for an object implementing Stream666")
    }

    this._sourceHasData = false
    this._destinationWantsData = false
    this._size = undefined // can be set by _read
}
util.inherits(Stream666.Readable, stream.Readable)
Stream666.Readable.prototype._read = function(size) {
    this._destinationWantsData = true
    if(this._sourceHasData) {
        pushSourceData(this, size)
    } else {
        this._size = size
    }
}
Stream666.Readable.prototype.sourceHasData = function(_sourceHasData) {
    this._sourceHasData = _sourceHasData
    if(_sourceHasData && this._destinationWantsData) {
        pushSourceData(this, this._size)
    }
}
Stream666.Readable.prototype.end = function() {
    this.push(null)
}
function pushSourceData(stream666Readable, size) {
    var data = stream666Readable._readSource(size)
    if(data !== null) {
        if(!stream666Readable.push(data)) {
            stream666Readable._destinationWantsData = false
        }
    } else {
        stream666Readable._sourceHasData = false
    }
}    

// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
    // stream - the stream to peek at
    // callback - called when there's data sent from the passed stream
var StreamPeeker = function(myStream, callback) {
    Stream666.Readable.call(this)
    this.stream = myStream
    this.callback = callback

    myStream.on('readable', function() {
        this.sourceHasData(true)
    }.bind(this))
    myStream.on('end', function() {
        this.end()
    }.bind(this))
}
util.inherits(StreamPeeker, Stream666.Readable)
StreamPeeker.prototype._readSource = function(size) {
    var data = this.stream.read(size)
    if(data !== null) {
        this.callback(data)
        return data
    } else {
        this.sourceHasData(false)
        return null
    }
}

旧答案:

// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
    // stream - the stream to peek at
    // callback - called when there's data sent from the passed stream
var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
    stream.Readable.call(this)
    this.stream = myStream
    this.callback = callback
    this.reading = false
    this.sourceIsReadable = false

    myStream.on('readable', function() {
        this.sourceIsReadable = true
        this._readMoreData()
    }.bind(this))

    myStream.on('end', function() {
        this.push(null)
    }.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
    this.reading = true
    if(this.sourceIsReadable) {
        this._readMoreData()
    }
}
StreamPeeker.prototype._readMoreData = function() {
    if(!this.reading) return;

    var data = this.stream.read()
    if(data !== null) {
        if(!this.push(data)) {
            this.reading = false
        }
        this.callback(data)
    }
}

关于node.js - 如何在 node.js 中实现正确处理背压的流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29308878/

相关文章:

javascript - Nodejs 中的 session 管理

node.js - NodeJs 中的子进程生成输出文件和目录

node.js - 将十六进制字符串转换为二进制数据放入缓冲区

asp.net - 使用FileStream ASPNET发送大文件500MB时出现OutOfMemoryException

Dart/flutter : Error "Stream has already been listened to." && "await for" inside fa "for loop" fails

java - 序列输入流比文件输入流快吗

javascript - RxJs:zip 运算符的有损形式

java - 从 Flux<Integer> 中分块读取

javascript - Highland.js如何实现背压?

node.js - 如何使用 Node.js/Electron 确定 Windows 10 深色模式?