我正在尝试将大型 csv 文件(100K 行;10-100M+)上传并插入到 mongo 中。
下面的代码是我用来接受来自表单的输入并首先将记录插入到我所有 csv 的元数据集合中,然后将 csv 的记录插入到它自己的集合中的路径。它适用于较小的文件(数千行),但当它达到 50K+ 的顺序时会花费很长时间。
下一个片段是将 csv 流用于较大的文件(见下文),但在尝试使用该流时出现错误。
问题:有人可以帮助将第一个示例修改为流,以便它可以处理大型 csv 而不会挂起。
exports.addCSV = function(req,res){
var body = req.body;
fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
if(err){
fileSystem.unlink(req.files.myCSV.path, function(){});
throw error;
}
});
var myObject = { userid: body.userid,
name: body.name,
description: body.description
};
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){
if(err) throw err;
var collection = db.collection('myCSVs');
collection.insert(myObject, function(err, insertedMyObject){
csvParser.mapFile('uploads/myFile', function(err, allRows){
if (err) throw err;
var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;
for (r in allRows) {
allRows[r].metric = parseFloat(allRows[r].metric);
}
var finalcollection = db.collection(collectionId);
finalcollection.insert(allRows, function(err, insertedAllRows) {
if (err) {
res.send(404, "Error");
}
else {
res.send(200);
}
});
});
});
});
}
编辑(让人们删除保留状态):
我使用流尝试了这种方法:
exports.addCSV = function(req,res){
var body = req.body;
fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
if(err){
fileSystem.unlink(req.files.myCSV.path, function(){});
throw error;
}
});
var myObject = { userid: body.userid,
name: body.name,
description: body.description
};
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){
if(err) throw err;
var collection = db.collection('myCSVs');
collection.insert(myObject, function(err, insertedMyObject){
var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;
var finalcollection = db.collection(collectionId);
var q = async.queue(finalcollection.insert.bind(finalcollection), 5);
q.drain = function() {
console.log('all items have been processed');
}
csv()
.from.path('uploads/myFile', {columns: true})
.transform(function(data, index, cb){
q.push(data, cb);
})
.on('end', function () {
res.send(200);
console.log('on.end() executed');
})
.on('error', function (err) {
res.end(500, err.message);
console.log('on.error() executed');
});
});
});
}
但是我得到这个错误:
events.js:72
throw er; // Unhandled 'error' event
^
TypeError: object is not a function
第三,我尝试了这种流媒体方式:
var q = async.queue(function (task,callback) {
finalollection.insert.bind(task,function(err, row) { });
callback();
}, 5);
q.drain = function() {
console.log('all items have been processed');
}
csv()
.from.path('uploads/myFile', {columns: true})
.transform(function(data, index, cb){
q.push(data)
})
.on('end', function () {
res.send(200);
console.log('on.end() executed');
})
.on('error', function (err) {
res.end(500, err.message);
console.log('on.error() executed');
});
这会插入一些然后中止:
all items have been processed
all items have been processed
Error: Request aborted
at IncomingMessage.<anonymous>
这个实际上试图将同一 csv 的多个集合插入到数据库中。最后,我尝试了 q 的单行定义:
var q = async.queue(finalcollection.insert.bind(finalcollection), 5);
连同:
.transform(function(data, index, cb){
q.push(data,function (err) {
console.log('finished processing foo');
});
})
它多次插入集合并每次都中止(下面是每次发生的输出 - 为什么它没有正确退出并重新插入?):
finished processing foo
finished processing foo
finished processing foo
finished processing foo
finished processing foo
all items have been processed
Error: Request aborted
at IncomingMessage.<anonymous> (.../node_modules/express/node_modules/connect/node_modules/multiparty/index.js:93:17)
at IncomingMessage.EventEmitter.emit (events.js:92:17)
at abortIncoming (http.js:1892:11)
at Socket.serverSocketCloseListener (http.js:1904:5)
at Socket.EventEmitter.emit (events.js:117:20)
at TCP.close (net.js:466:12)
最佳答案
您应该使用流处理大文件。
这是一个可能的解决方案:
var queue = async.queue(collection.insert.bind(collection), 5);
csv()
.from.path('./input.csv', { columns: true })
.transform(function (data, index, cb) {
queue.push(data, function (err, res) {
if (err) return cb(err);
cb(null, res[0]);
});
})
.on('error', function (err) {
res.send(500, err.message);
})
.on('end', function () {
queue.drain = function() {
res.send(200);
};
});
请注意:
- 我们使用
node-csv
的流 API ,这确保了在读取文件的同时处理数据:这样就不会立即在内存中读取整个文件。transform
处理程序为每条记录执行; - 我们使用
async.queue
,这是一个异步处理队列:最多并行执行5个处理程序(finalcollection.insert
)。
这个例子应该被测试,因为我不确定它是否真的能很好地处理背压。此外,队列的并发级别应根据您的具体配置进行调整。
您还可以找到 working gist here .
关于jquery - 使用 Node.js 和 async.queue 将大型 CSV 插入 MongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20722891/