javascript - 使用 Node.js 在流中进行异步调用的问题

标签 javascript node.js asynchronous stream

我在流中进行异步调用时遇到问题。看起来由于某种原因,如果我有三个流并且中间流进行异步调用,则最终流永远不会收到“结束”事件。我可以用一些简单的 through 来模拟行为流和超时。

var options = {}
var streamOne = function(){
  return through(function write(row){
    this.emit('data', row)       
  })
}

var streamTwo = function(){
  return through(function write(row){
    this.pause()
    var that = this;

    setTimeout(function(){
      that.emit('data', row)
      that.resume()
    }, 1000)
  })
}

options.streams = [new streamOne(), new streamTwo()]

然后我将其传递给 event-stream

var streams = []
//Pass stream one and stream two to es.pipe
streams.push(es.pipe.apply(this, options.streams))

//Then add the final stream 
var endStream = es.pipe(
  new streamThree()
)
streams.push(endStream)

//And send it through the pipeline
es.pipeline.apply(this, streams)

因此,这在当前情况下不起作用。

几个令人困惑的点:如果我删除streamOne,它就可以工作!如果streamTwo不进行异步调用,它就可以工作。这让我认为问题在于两个流交互的方式。但是,如果我在整个代码中 console.log ,看起来一切正常,streamThree 将写入数据但永远不会注册“结束”事件。 *注意:streamThree 没有使用 through,而是使用 native Streams 模块。

思考为什么会发生这种情况?

最佳答案

运行一些测试用例后,看起来管道或直通没有正确处理 I/O。我不完全确定为什么会出现这种情况,但我认为这是由于流的暂停和恢复而导致问题的竞争条件。我做了一些事情来清理代码:

1) 简化管道。我没有在管道内执行嵌套的 es.pipe,而是直接将流放入其中。这有助于更好地管理流之间的数据流。

2) 我没有从常规流中发出数据,而是使用 this.queue 对数据进行排队,让模块处理潜在的背压。

3) 我使用了事件流方法es.map处理异步调用的流程。 我认为这是一个更好的解决方案,因为它更干净地处理流的暂停和恢复,并且仍然返回直通流。

关于javascript - 使用 Node.js 在流中进行异步调用的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18706222/

相关文章:

javascript - 对理解闭包有一点帮助吗?

javascript - 我想使用 amount_t 属性添加这些金额。并显示在文本框中

javascript - 在 Vue + Webpack 中使用 Puppeteer (Headless Chrome) 时的依赖错误

node.js - 如何使用 Node.js 连接到 Cloud SQL?

c# - PostAsync 后未收到响应

javascript - 有没有更短的方法来编写这个 Jquery 代码?

php - 在 JQuery/Javascript 中使用值的问题

javascript - 我的项目中可以有多个 gruntjs 文件用于代码组织吗?

javascript - 返回 false 是否可以通过异步函数传递 ESLint 一致返回错误?

c# - 如何将松散耦合和可扩展的设计与可能的异步实现相结合?