在我的 MMO 人口普查/Angular 色统计跟踪应用程序中,我从用户处获取输入批处理,每个批处理最多包含 5-20k 个文档,我需要将其聚合到数据库中。我需要寻找特定的标准来确定输入中的文档是否已存在于集合中并且需要更新,或者它是否是全新的并且需要插入到集合中。
为了让我的应用程序正常工作,我可以准确地确定在处理输入后实际更新和/或直接添加了多少文档,这一点也非常重要。
为了更好地解释我想要做什么,我将其分解为一个简化的示例,我可以向您展示输入的样子以及所需的结果。
<小时/>作为以下输入案例的起点,集合如下所示:
collection = [
{ name: 'Jean', server: 'Alpha', level: 9 },
{ name: 'Anna', server: 'Beta', level: 17 },
{ name: 'Jean', server: 'Beta', level: 10 }
];
基本上,我需要涵盖 3 个输入案例。
<小时/>案例#1
当我收到具有全新名称
+服务器
组合的输入时,应将新文档添加到集合中
input = { name: 'Victor', server: 'Alpha', level: 22 };
应该变成:
collection = [
{ name: 'Jean', server: 'Alpha', level: 9 },
{ name: 'Anna', server: 'Beta', level: 17 },
{ name: 'Jean', server: 'Beta', level: 10 },
{ name: 'Victor', server: 'Alpha', level: 22 }
];
案例#2
当我收到包含现有name
+server
组合的输入,但具有更高级别
时,应更新现有文档
input = { name: 'Jean', server: 'Alpha', level: 10 };
应该变成
collection = [
{ name: 'Jean', server: 'Alpha', level: 10 },
{ name: 'Anna', server: 'Beta', level: 17 },
{ name: 'Jean', server: 'Beta', level: 10 }
];
案例#3
当我收到包含现有 name
+server
组合的输入,但等于或更低时级别
,什么都不会发生,集合应该保持原样
input = { name: 'Jean', server: 'Alpha', level: 9 };
或
input = { name: 'Jean', server: 'Alpha', level: 8 };
应该保留:
collection = [
{ name: 'Jean', server: 'Alpha', level: 9 },
{ name: 'Anna', server: 'Beta', level: 17 },
{ name: 'Jean', server: 'Beta', level: 10 }
];
<小时/>
到目前为止我所做的基本上是将整个集合提取到一个数组中,然后使用 Array.filter 找出集合中已经存在的输入并使用 更新它们findOneAndUpdate
,以及哪些输入是新的,然后使用 insertMany
将它们插入到集合中:
Test.find({}, async (err, documents) => {
if (err) return console.log(err);
if (documents.length > 0) {
const changedInputs = inputs.filter(byChanged(documents));
const newInputs = inputs.filter(byNew(documents));
const insertResult = await Test.insertMany(newInputs);
const inserted = insertResult.length;
const updateResults = await Promise.all(compileUpdatePromises(changedInputs));
let updated = 0;
updateResults.forEach(updateResult => {
updated = updateResult === 'updated' ? updated + 1 : updated;
});
console.log('updated:', updated);
console.log('inserted:', inserted);
}
});
Link to a gist with the whole example
当集合中没有太多文档时,这工作得很好,但现在它已经增长到 50k+ 文档,它变得非常慢,并在此过程中阻塞 mongo 连接,这也阻塞了所有其他调用的整个 api。
一旦这个应用程序获得更多流量,它就有可能快速增长到包含一百万个文档的集合,然后不断更新。
是否有任何简单且更有效的方法可以让 mongodb 为我完成所有这些艰苦的工作,而不是我自己完成所有工作?
<小时/>更新1:
根据simagix和黑化的建议,我非常接近解决方案。这就是我更改后的代码现在的样子:
const bulkInput = inputs.map(input => ({
updateOne: {
filter: { name: input.name, server: input.server, level: { $lte: input.level } },
upsert: true,
update: { $set: { name: input.name, server: input.server, level: input.level } }
}
}));
Test.bulkWrite(bulkInput).then(result => {
console.log('inserted:', result.nUpserted, 'updated:', result.nModified);
});
现在的问题是案例#3的第二个示例:
input = { name: 'Jean', server: 'Alpha', level: 8 };
结果:
collection = [
{ name: 'Jean', server: 'Alpha', level: 9 },
{ name: 'Anna', server: 'Beta', level: 17 },
{ name: 'Jean', server: 'Beta', level: 10 },
{ name: 'Jean', server: 'Alpha', level: 8 }
];
<小时/>
更新2:
只需制作复合索引
testSchema.index({ name: 1, server: 1 });
到唯一的复合索引
testSchema.index({ name: 1, server: 1 }, { unique: true });
现在我必须找到一个正确的解决方案来处理案例 #3 示例 2 引发的 E11000 重复键错误
异常。
最佳答案
首先,设置复合索引。 https://docs.mongodb.com/manual/core/index-compound/
在 mongodb 和 mongoose 上都可用。
其次,请编写正确的检索查询。 $or ( https://docs.mongodb.com/manual/reference/operator/query/or/ ) 当索引支持时是 O(k log n),其中 k 是匹配项的数量。
或者,尝试批量操作。 https://docs.mongodb.com/manual/reference/method/Bulk/ 。
它可以返回成功查找/更新的数量。 https://docs.mongodb.com/manual/reference/method/BulkWriteResult/ 。添加一个额外的字段来查找级别: { $lt: currlvl } 仅有条件地进行更新。我不太清楚如何将其与更新插入结合起来。
最后,如果我是你,我会散列/连接服务器和名称并将其设为 id。会让生活变得更加轻松。
关于javascript - 使用 mongodb/mongoose 有条件地将 5-20k 文档的输入批处理处理为包含多达一百万个文档的集合的有效方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56824760/