我有一个定期运行的函数,它更新我的 Prices
集合中一些 Documents
的 item.price
。 Price Collection
包含 100k 多个项目。该函数如下所示:
//Just a helper function for multiple GET requests with request.
let _request = (urls, cb) => {
let results = {}, i = urls.length, c = 0;
handler = (err, response, body) => {
let url = response.request.uri.href;
results[url] = { err, response, body };
if (++c === urls.length) {
cb(results);
}
};
while (i--) {
request(urls[i], handler);
}
};
// function to update the prices in our Prices collection.
const update = (cb) => {
Price.remove({}, (err, remove) => {
if (err) {
return logger.error(`Error removing items...`);
}
logger.info(`Removed all items... Beginning to update.`);
_request(urls, (responses) => {
let url, response, gameid;
for (url in responses) {
id = url.split('/')[5].split('?')[0];
response = responses[url];
if (response.err) {
logger.error(`Error in request to ${url}: ${err}`);
return;
}
if (response.body) {
logger.info(`Request to ${url} successful.`)
let jsonResult = {};
try {
jsonResult = JSON.parse(response.body);
} catch (e) {
logger.error(`Could not parse.`);
}
logger.info(`Response body for ${id} is ${Object.keys(jsonResult).length}.`);
let allItemsArray = Object.keys(jsonResult).map((key, index) => {
return {
itemid: id,
hash_name: key,
price: jsonResult[key]
}
});
Price.insertMany(allItemsArray).then(docs => {
logger.info(`Saved docs for ${id}`)
}, (e) => {
logger.error(`Error saving docs.`);
});
}
}
if (cb && typeof cb == 'function') {
cb();
}
})
});
}
如您所见,为了避免遍历 100k+ 文档并分别更新每个文档,我在开始时将它们全部删除,然后调用为我提供这些商品价格的 API,并使用 InsertMany
将它们全部插入到我的价格集合中。
此更新过程每 30 分钟进行一次。
但我现在才意识到,如果某些用户想要查看价格,而我的 Prices Collection
目前是空的,因为它正在 self 更新怎么办?
问题
那么我是否必须遍历所有这些才能不删除它? (请记住,有许多文档每 30 分钟要更新一次。)或者有其他解决方案吗?
这是我的价格集合
的图片(有 10 万个这样的文档,我只想更新价格属性):
更新:
我重写了一些update
函数,现在它看起来像这样:
const update = (cb = null) => {
Price.remove({}, (err, remove) => {
if (err) {
return logger.error(`Error removing items...`);
}
logger.info(`Removed all items... Beginning to update.`);
_request(urls, (responses) => {
let url, response, gameid;
for (url in responses) {
gameid = url.split('/')[5].split('?')[0];
response = responses[url];
if (response.err) {
logger.error(`Error in request to ${url}: ${err}`);
return;
}
if (response.body) {
logger.info(`Request to ${url} successful.`)
let jsonResult = {};
try {
jsonResult = JSON.parse(response.body);
} catch (e) {
logger.error(`Could not parse.`);
}
logger.info(`Response body for ${gameid} is ${Object.keys(jsonResult).length}.`);
let allItemsArray = Object.keys(jsonResult).map((key, index) => {
return {
game_id: gameid,
market_hash_name: key,
price: jsonResult[key]
}
});
let bulk = Price.collection.initializeUnorderedBulkOp();
allItemsArray.forEach(item => {
bulk.find({market_hash_name: item.market_hash_name})
.upsert().updateOne(item);
});
bulk.execute((err, bulkers) => {
if (err) {
return logger.error(`Error bulking: ${e}`);
}
logger.info(`Updated Items for ${gameid}`)
});
// Price.insertMany(allItemsArray).then(docs => {
// logger.info(`Saved docs for ${gameid}`)
// }, (e) => {
// logger.error(`Error saving docs.`);
// });
}
}
if (cb && typeof cb == 'function') {
cb();
}
})
});
}
现在请注意批量变量(感谢@Rahul),但现在,集合需要很长时间才能更新。我的处理器快耗尽了,更新 60k+ 文档实际上需要 3 分钟以上。老实说,我觉得像以前的方法,虽然它可能会删除所有这些然后重新插入它们,但它也需要 10 倍的速度。
有人吗?
最佳答案
根据我的经验(每小时更新数百万个 mongo 文档),这是一个非常大的批量更新的现实方法:
- 单独执行所有 API 调用并将结果作为 bson 写入文件
- 调用
mongoimport
并将该 bson 文件导入到一个新的空集合prices_new
中。 Javascript,更不用说高级 OO 包装器,都太慢了 - 重命名
prices_new
->prices
dropTarget=true
(这将是原子的,因此没有停机时间)
从原理上讲,它在 JS 中看起来像这样
let fname = '/tmp/data.bson';
let apiUrls = [...];
async function doRequest(url) {
// perform a request and return an array of records
}
let responses = await Promise.all(apiUrls.map(doRequest));
// if the data too big to fit in memory, use streams instead of this:
let data = flatMap(responses, BSON.serialize).join('\n'));
await fs.writeFile(fname, data);
await child_process.exec(`mongoimport --collection prices_new --drop ${fname}`);
await db.prices_new.renameCollection('prices', true);
关于javascript - 以最有效的方式更新许多(100k+)文档 MongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48852556/