Node v.4.2.3 和 mongoose v.4.3.6
我必须迭代一个大型(>10k 文档)集合,并处理每个文档。
阅读有关如何处理此类迭代的文档时,我偶然发现了 QueryStream,我认为它可以解决我的所有问题。
function progress(total, t, current) {
process.stdout.clearLine(); // clear current text
process.stdout.write(Math.round(t / total * 100) + '% ' + t + ' / ' + total + ' ' + current);
process.stdout.cursorTo(0);
}
function loadBalance(current, stream) {
if(!stream.paused && current > 50) {
log('DEBUG', 'loadBalance', 'pause');
stream.pause();
} else if (stream.paused && current < 10) {
log('DEBUG', 'loadBalance', 'resume');
stream.resume();
}
}
var total = 0,
error = 0,
goods = 0,
current = 0;
stream = Raw.find().stream();
stream.on('data', function (doc) {
heavyProcess(doc, function (err, refined) {
current = current + 1;
loadBalance(current, stream);
printP(total, goods + error, current);
if(err) {
error = error + 1;
current = current - 1;
loadBalance(current, stream);
} else {
new Pure(refined).save(function (err) {
if(err) {
error = error + 1;
current = current - 1;
loadBalance(current, stream);
} else {
goods = goods + 1;
current = current - 1;
loadBalance(current, stream);
}
});
}
});
}).on('error', function (err) {
log('ERROR', 'stream', err);
}).on('close', function () {
log('INFO', 'end', goods + ' / ' + total + ' ( ' + (goods/total*100) + '%) OK_');
log('INFO', 'end', error + ' / ' + total + ' ( ' + (error/total*100) + '%) NOK');
log('INFO', 'end', (total - goods - error) + ' missing');
});
loadBalance 确实被调用,并打印它正在暂停流,但 'data'
事件继续被触发,即使 stream.paused
返回 true。
我是否误解了 pause()
的作用?或者我误用了 QueryStream?
最佳答案
Mongoose 查询流是 v1 流。在文档中称为 Node 0.8 ReadStream ( http://mongoosejs.com/docs/api.html#querystream_QueryStream )
这意味着暂停事件是“建议”https://nodejs.org/api/stream.html#stream_compatibility_with_older_node_js_versions
这里的Advisory是指调用pause后,部分数据事件仍然会漏槽。
这与底层流缓存有关,并且是正确的流 v1 行为。
您将必须使用调用暂停后生成的任何数据事件。
从开发人员的 Angular 来看,这种行为当然不是最佳的,这就是它在流 v2 中进行更改的原因 ( https://nodejs.org/en/blog/feature/streams2/ )
这是一个与 v2 查询流相关的 mongoogejs 问题,我认为短期内没有任何计划实现 v2 查询流。
https://github.com/Automattic/mongoose/issues/1907
引用问题,这可能是解决您问题的方法:
var readStream = (new stream.Readable({ objectMode: true })).wrap(Model.find({}).stream());
关于javascript - Mongoose QueryStream.pause() 不暂停?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34936618/