我有一个 CSV 文件,我正在将其作为流读取,并使用转换转换为 JSON,然后将每一行异步存储到数据库中。
问题是从文件中读取速度很快,因此会导致大量并发异步数据库操作,从而导致应用程序停止运行。
我想限制应用程序,以便在任何给定时间最多进行 N 个未完成的数据库操作。
这是我的 _transform 函数的基本核心:
parser._transform = function(data, encoding, done) {
//push data rows
var tick = this._parseRow(data);
//Store tick
db.set(tick.date, tick, function(err, result) {
console.log(result);
if(err) throw err;
});
this.push(tick);
done();
};
我看过几个选项,但这些似乎是最佳候选:
- 使用异步 api 'forEachLimit'
- 我在这里看到的问题是,在我的流转换中,发出操作时我只对一个对象(文件中的行)进行操作。
- 由于大小原因,读取整个文件是不可行的
- 使用此处第 7.2.3 节中所述的异步、并行、并发限制解决方案:
- http://book.mixu.net/node/ch7.html
- 这里的问题是在达到“限制”的情况下该怎么做。
- 旋转或使用 setTimeout 似乎用完了所有预定时间并阻止了我的数据库回调,这应该减少正在启动的“运行”计数器。
这是我对“并发限制解决方案”的初步尝试:
var limit = 100;
var running = 0;
parser._transform = function(data, encoding, done) {
//push data rows
var tick = this._parseRow(data);
this.push(tick);
//Store tick to db
if (running < limit) {
console.log("limit not reached, scheduling set");
running++;
cb.set(tick.date, tick, function(err, result) {
running--;
console.log("running is:" + running);
console.log(result);
if(err) throw err;
});
} else {
console.log("max limit reached, sleeping");
setTimeout(this._transform(data, encoding, done),1000);
}
done();
};
我这周才开始使用 node.js,所以我不清楚解决这个问题的正确模型是什么。
注意:我知道的几件事是,如果使用后一种模型,这至少应该是指数退避,并且应该有一些“最大退避”系统,所以以免破坏调用堆栈。不过,这里暂时尽量保持简单。
最佳答案
并发限制解决方案选项是我会采用的方法,但我不会自己实现,而是使用 async模块。具体来说,queue方法。
类似于:
var dbQueue = async.queue(function(tick, callback) {
db.set(tick.date, tick, function(err, result) {
console.log(result);
callback(err, result);
});
}, 3); // the last arg (3) is the concurrency level; tweak as needed
parser._transform = function(data, encoding, done) {
//push data rows
var tick = this._parseRow(data);
dbQueue.push(tick);
this.push(tick);
done();
};
这会将您的数据库操作限制为一次 3 个。此外,您可以使用队列的 saturated
和 empty
事件来pause
/resume
您的流以保持更多内容在资源使用方面受到限制(如果您正在阅读非常大的文件,这会很好)。这看起来像:
dbQueue.saturated = function() {
parser.pause();
}
dbQueue.empty = function() {
parser.resume();
}
关于javascript - nodejs : read from file and store to db, 限制最大并发数据库操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25098022/