javascript - 使用 setTimeout 时,node.js 处理流背压

标签 javascript node.js asynchronous concurrency

这是我在之前的问题中遇到的进一步问题的后续问题:

nodejs: read from file and store to db, limit maximum concurrent db operations

问题:

我想有条件地在以后重新安排一些操作,但这破坏了我处理背压的方法。

详细信息:

我有一个 CSV 文件,我正在将其作为流读取,并使用转换将其转换为 JSON,然后将每一行异步存储到数据库中。

当行由转换处理时,它们被放置到负责发出数据库操作的异步队列中。

例如

parser._transform = function(data, encoding, done) {

    var tick = this._parseRow(data);

    dbQueue.push(tick, function(err, result) {
      if (typeof(err) != 'undefined') { console.log(err) }
    });

    this.push(tick);
    done();
}

当队列饱和/空时,通过暂停和恢复解析器来处理背压:

dbQueue.saturated = function() {
  parser.pause();
}

dbQueue.empty = function() {
  parser.resume();
}

我一直试图做出的改变是,当一个项目从队列中取出时,它会在未来有条件地重新安排一段时间(100毫秒):

var dbQueue = async.queue(function(data, callback) {
  if (condition) {
    // re-schedule operation by adding back to queue 100ms later
    setTimeout(function(data, callback) {
        dbQueue.push(data, function(err, result){
      });
    }, 100, data, callback);
  } else {
    //execute the db store
     ... ...
  }
}

我相信我的问题是,现在许多操作将大部分时间花在 setTimeout 上,因此 dbQueue 将为空,并且转换流上的背压没有按预期处理。

我尝试过使用 max_ops 和 running_ops 等计数器来确保流暂停/恢复,但没有成功。

在 Node.js 中是否有更惯用的处理方法?

最佳答案

由于这看起来像是一个外部条件,与 dbQueue 正在执行的操作无关,因此当条件发生时,我不会将数据重新插入队列,而是简单地暂停 dbQueue。例如,假设您的情况是数据库由于某种原因断开连接,并且您可以监听一个事件。在这种情况下,您可以执行与 dbQueue 饱和/空时类似的操作:

db.on('disconnect', function() {
    dbQueue.pause();
});

db.on('connect', function() {
    dbQueue.resume();
});

这通常是比等待某些预先确定的超时更好的方法。话虽这么说,有时等待超时是唯一的选择。在这种情况下,您可以执行类似的操作,但无需等待单独的事件来触发 resume(),只需使用 setTimeout():

db.on('disconnect', function() {
    dbQueue.pause();
    setTimeout(function() {
        dbQueue.resume();
    });
});

注意:如果我们真的在这里讨论数据库断开连接,那么如果在 100 毫秒时间不够的情况下出现数据库错误,您可能还需要暂停/恢复dbQueue让数据库重新连接

如果您正在寻找更具体的条件,并且您愿意分享它是什么,我也许可以给您一个更好的例子:)

关于javascript - 使用 setTimeout 时,node.js 处理流背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25126887/

相关文章:

javascript - 在 Controller 之间传递常量值 angularjs

javascript - 如何从范围内的变量填充 angularjs 中的单选按钮并将任何更改收集到范围内

html - 没有 Content-Length 的流媒体响应

javascript - 使用 $http 调用函数后 $scope 成员未更新

javascript - 加载 angularjs 路由后运行 javascript 代码

javascript - 在 Ember 中保存预订时使用关系更新已经存在的事件

javascript - 如果输入字段为空且有值,则不执行任何操作,检查它是否为数字,如果不是则显示警报

javascript - angular.js 端到端测试和 $timeout 问题

node.js - Mongoose 点符号 $inc 更新

javascript - 为什么我不能捕获从 node-postgres 抛出的错误?