这听起来像是 Q
或 async
等库的一个非常典型的用例,但我无法真正弄清楚什么是最好的方法。
我想导入一个包含 150 行的 CSV 文件(使用 node-csv
),并为每行创建一个 mongo 文档。
然而,流解析似乎比“数据库插入”完成得更快,所以我遇到了
回调被过早调用的问题。
// importtest.mocha.js
[...]
importer.loadFromCsv (url, function(result) {
result.length.should.be.equal (150); // nope, it's always around 41
}
// importer.js
function loadFromCsv (url, callback){
csv().from.stream(url)
.on ('record', function(record, index){
new Row({data: record}).save(function() {
console.log ('saved a row to db');
});
})
.on ('end', function() {
callback (Row.find({})); // E parser finished, but probably not all Row.save()
});
}
所以,请任何人给我一个提示,我可以如何使用异步/ promise 来解决这个问题,以便 虽然流解析/数据库插入是异步的,但最终的回调只有在所有插入完成后才会完成?
最佳答案
当您插入许多记录时,您应该单独处理每条记录。这是一个未经测试的代码片段,您可以尝试并调整。实际上,您创建了一个 promise 列表,当所有 promise 都得到解决时,传递给 then(fn) 的函数将被触发。嗯,正如代码中提到的,您应该注意有错误的记录。请注意,只有当所有 promise 都得到解决(成功)时,传递给 then(fn) 的函数才会执行。要指示记录的 promise 错误,您应该使用 defer.reject() 而不是 def.resolve()。然后还传递 onErrorFn 占位符的函数。它类似于 SQL 事务。
这里是包含注释的代码:
var q = require('q');
function loadFromCsv (url, callback){
// create an array holding all promises
var csv_promises = [];
csv().from.stream(url)
.on ('record', function(record, index){
// create new defer object, per row
var row_defer = q.defer();
// make sure, this function gets called, only after the row got saved
new Row({data: record}).save(function() {
console.log ('saved a row to db');
// resolves the promise, per row
row_defer.resolve(record);
// todo: take care for an error, per row
});
csv_promises.push(row_defer.promise); // add promise to promise list, per row
})
.on ('end', function() {
// callback (Row.find({})); // E parser finished, but probably not all Row.save()
// q.all gets resolved and fires passed function as soon as ALL promises in csv_promises array are resolved
// todo: take care for errors
q.all(csv_promises).then(function() {
callback( csv_promises );
} /*, onErrorFn */ );
}
loadFromCsv( "URL", function(rows) {
console.log("Treated rows: ", rows.length);
});
关于javascript - Node.js 从流导入数据库后如何同步最终回调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20582703/