我一辈子都想不出如何实现 stream正确处理背压。你不应该使用暂停和恢复吗?
我有这个实现,我正在尝试使其正常工作:
var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
stream.Readable.call(this, {highWaterMark: highWaterMark})
this.stream = myStream
myStream.on('readable', function() {
var data = myStream.read(5000)
//process.stdout.write("Eff: "+data)
if(data !== null) {
if(!this.push(data)) {
process.stdout.write("Pause")
this.pause()
}
callback(data)
}
}.bind(this))
myStream.on('end', function() {
this.push(null)
}.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
process.stdout.write("resume")
//this.resume() // putting this in for some reason causes the stream to not output???
}
它正确地发送了输出,但没有正确地产生背压。如何更改它以正确支持背压?
最佳答案
好吧,经过大量的反复试验,我终于弄明白了。一些准则:
- 永远不要使用暂停或恢复(否则它会进入传统的“流动”模式)
- 永远不要添加“数据”事件监听器(否则它将进入传统的“流动”模式)
- 实现者有责任跟踪源代码何时可读
- 实现者有责任跟踪目的地何时需要更多数据
- 在调用
_read
方法之前,实现不应读取任何数据 read
的参数告诉源给它那么多字节,最好将传递给this._read
的参数传递给源的read
方法。通过这种方式,您应该能够配置在目的地一次读取多少,并且流链的其余部分应该是自动的。
所以我把它改成了:
更新:我创建了一个 Readable,它在适当的背压下更容易实现,并且应该具有与 Node 的 native 流一样多的灵 active 。
var Readable = stream.Readable
var util = require('util')
// an easier Readable stream interface to implement
// requires that subclasses:
// implement a _readSource function that
// * gets the same parameter as Readable._read (size)
// * should return either data to write, or null if the source doesn't have more data yet
// call 'sourceHasData(hasData)' when the source starts or stops having data available
// calls 'end()' when the source is out of data (forever)
var Stream666 = {}
Stream666.Readable = function() {
stream.Readable.apply(this, arguments)
if(this._readSource === undefined) {
throw new Error("You must define a _readSource function for an object implementing Stream666")
}
this._sourceHasData = false
this._destinationWantsData = false
this._size = undefined // can be set by _read
}
util.inherits(Stream666.Readable, stream.Readable)
Stream666.Readable.prototype._read = function(size) {
this._destinationWantsData = true
if(this._sourceHasData) {
pushSourceData(this, size)
} else {
this._size = size
}
}
Stream666.Readable.prototype.sourceHasData = function(_sourceHasData) {
this._sourceHasData = _sourceHasData
if(_sourceHasData && this._destinationWantsData) {
pushSourceData(this, this._size)
}
}
Stream666.Readable.prototype.end = function() {
this.push(null)
}
function pushSourceData(stream666Readable, size) {
var data = stream666Readable._readSource(size)
if(data !== null) {
if(!stream666Readable.push(data)) {
stream666Readable._destinationWantsData = false
}
} else {
stream666Readable._sourceHasData = false
}
}
// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
// stream - the stream to peek at
// callback - called when there's data sent from the passed stream
var StreamPeeker = function(myStream, callback) {
Stream666.Readable.call(this)
this.stream = myStream
this.callback = callback
myStream.on('readable', function() {
this.sourceHasData(true)
}.bind(this))
myStream.on('end', function() {
this.end()
}.bind(this))
}
util.inherits(StreamPeeker, Stream666.Readable)
StreamPeeker.prototype._readSource = function(size) {
var data = this.stream.read(size)
if(data !== null) {
this.callback(data)
return data
} else {
this.sourceHasData(false)
return null
}
}
旧答案:
// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
// stream - the stream to peek at
// callback - called when there's data sent from the passed stream
var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
stream.Readable.call(this)
this.stream = myStream
this.callback = callback
this.reading = false
this.sourceIsReadable = false
myStream.on('readable', function() {
this.sourceIsReadable = true
this._readMoreData()
}.bind(this))
myStream.on('end', function() {
this.push(null)
}.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
this.reading = true
if(this.sourceIsReadable) {
this._readMoreData()
}
}
StreamPeeker.prototype._readMoreData = function() {
if(!this.reading) return;
var data = this.stream.read()
if(data !== null) {
if(!this.push(data)) {
this.reading = false
}
this.callback(data)
}
}
关于node.js - 如何在 node.js 中实现正确处理背压的流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29308878/