javascript - nodejs : read from file and store to db, 限制最大并发数据库操作

标签 javascript node.js asynchronous concurrency

我有一个 CSV 文件,我正在将其作为流读取,并使用转换转换为 JSON,然后将每一行异步存储到数据库中。

问题是从文件中读取速度很快,因此会导致大量并发异步数据库操作,从而导致应用程序停止运行。

我想限制应用程序,以便在任何给定时间最多进行 N 个未完成的数据库操作。

这是我的 _transform 函数的基本核心:

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    //Store tick
    db.set(tick.date, tick, function(err, result) {
      console.log(result);
      if(err) throw err;
    });

    this.push(tick);
    done();
};

我看过几个选项,但这些似乎是最佳候选:

  • 使用异步 api 'forEachLimit'
    • 我在这里看到的问题是,在我的流转换中,发出操作时我只对一个对象(文件中的行)进行操作。
    • 由于大小原因,读取整个文件是不可行的
  • 使用此处第 7.2.3 节中所述的异步、并行、并发限制解决方案:
    • http://book.mixu.net/node/ch7.html
    • 这里的问题是在达到“限制”的情况下该怎么做。
    • 旋转或使用 setTimeout 似乎用完了所有预定时间并阻止了我的数据库回调,这应该减少正在启动的“运行”计数器。

这是我对“并发限制解决方案”的初步尝试:

var limit = 100;
var running = 0;

parser._transform = function(data, encoding, done) {
  //push data rows
  var tick = this._parseRow(data);

  this.push(tick);
  //Store tick to db
  if (running < limit) {
    console.log("limit not reached, scheduling set");
    running++;
    cb.set(tick.date, tick, function(err, result) {
      running--;
      console.log("running is:" + running);
      console.log(result);
      if(err) throw err;
    });
  } else {
    console.log("max limit reached, sleeping");
    setTimeout(this._transform(data, encoding, done),1000);
  }
  done();
};

我这周才开始使用 node.js,所以我不清楚解决这个问题的正确模型是什么。

注意:我知道的几件事是,如果使用后一种模型,这至少应该是指数退避,并且应该有一些“最大退避”系统,所以以免破坏调用堆栈。不过,这里暂时尽量保持简单。

最佳答案

并发限制解决方案选项是我会采用的方法,但我不会自己实现,而是使用 async模块。具体来说,queue方法。

类似于:

var dbQueue = async.queue(function(tick, callback) {
    db.set(tick.date, tick, function(err, result) {
        console.log(result);
        callback(err, result);
    });
}, 3); // the last arg (3) is the concurrency level; tweak as needed

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    dbQueue.push(tick);

    this.push(tick);
    done();
};

这会将您的数据库操作限制为一次 3 个。此外,您可以使用队列的 saturatedempty 事件来pause/resume 您的流以保持更多内容在资源使用方面受到限制(如果您正在阅读非常大的文件,这会很好)。这看起来像:

dbQueue.saturated = function() {
    parser.pause();
}

dbQueue.empty = function() {
    parser.resume();
}

关于javascript - nodejs : read from file and store to db, 限制最大并发数据库操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25098022/

相关文章:

node.js - 403 不允许增加Parents的数量Google Drive API React Native

python - 登录异步 Tornado (python) 服务器

javascript - IE7 中的 JavaScript

javascript - If else 语句不适用于待办事项列表

javascript - 即 obj.attachEvent() 的例子

javascript - 发出的事件是否会产生大量负载,即使它们没有被监视?

node.js - 如何在 CouchDB 中使用给定参数进行查询?

c# - 在 C# 中执行异步触发时省略 await 语句并忘记使用 ado.net 写入数据库是否安全

c# - 异步和等待混淆

javascript - 使用 javascript/jquery 获取复杂嵌套 JSON 的各层数据