javascript - 以最有效的方式更新许多(100k+)文档 MongoDB

标签 javascript node.js mongodb mongoose promise

我有一个定期运行的函数,它更新我的 Prices 集合中一些 Documentsitem.pricePrice Collection 包含 100k 多个项目。该函数如下所示:

 //Just a helper function for multiple GET requests with request.
let _request = (urls, cb) => {
    let results = {}, i = urls.length, c = 0;
    handler = (err, response, body) => {
        let url = response.request.uri.href;
        results[url] = { err, response, body };

        if (++c === urls.length) {
            cb(results);
        }
    };
    while (i--) {
        request(urls[i], handler);
    }
};
// function to update the prices in our Prices collection.

const update = (cb) => {
    Price.remove({}, (err, remove) => {
        if (err) {
            return logger.error(`Error removing items...`);
        }
        logger.info(`Removed all items... Beginning to update.`);
        _request(urls, (responses) => {
            let url, response, gameid;

            for (url in responses) {
                id = url.split('/')[5].split('?')[0];
                response = responses[url];

                if (response.err) {
                    logger.error(`Error in request to ${url}: ${err}`);
                    return;
                }

                if (response.body) {
                    logger.info(`Request to ${url} successful.`)
                    let jsonResult = {};
                    try {
                        jsonResult = JSON.parse(response.body);
                    } catch (e) {
                        logger.error(`Could not parse.`);
                    }

                    logger.info(`Response body for ${id} is ${Object.keys(jsonResult).length}.`);
                    let allItemsArray = Object.keys(jsonResult).map((key, index) => {
                        return {
                            itemid: id,
                            hash_name: key,
                            price: jsonResult[key]
                        }
                    });

                    Price.insertMany(allItemsArray).then(docs => {
                        logger.info(`Saved docs for ${id}`)
                    }, (e) => {
                        logger.error(`Error saving docs.`);
                    });

                }
            }
            if (cb && typeof cb == 'function') {
                cb();
            }
        })
    });

}

如您所见,为了避免遍历 100k+ 文档并分别更新每个文档,我在开始时将它们全部删除,然后调用为我提供这些商品价格的 API,并使用 InsertMany 将它们全部插入到我的价格集合中。

此更新过程每 30 分钟进行一次。

但我现在才意识到,如果某些用户想要查看价格,而我的 Prices Collection 目前是空的,因为它正在 self 更新怎么办?

问题

那么我是否必须遍历所有这些才能不删除它? (请记住,有许多文档每 30 分钟要更新一次。)或者有其他解决方案吗?

这是我的价格集合的图片(有 10 万个这样的文档,我只想更新价格属性):

The Prices Collection

更新:

我重写了一些update 函数,现在它看起来像这样:

const update = (cb = null) => {
    Price.remove({}, (err, remove) => {
        if (err) {
            return logger.error(`Error removing items...`);
        }
        logger.info(`Removed all items... Beginning to update.`);
        _request(urls, (responses) => {
            let url, response, gameid;

            for (url in responses) {
                gameid = url.split('/')[5].split('?')[0];
                response = responses[url];

                if (response.err) {
                    logger.error(`Error in request to ${url}: ${err}`);
                    return;
                }

                if (response.body) {
                    logger.info(`Request to ${url} successful.`)
                    let jsonResult = {};
                    try {
                        jsonResult = JSON.parse(response.body);
                    } catch (e) {
                        logger.error(`Could not parse.`);
                    }

                    logger.info(`Response body for ${gameid} is ${Object.keys(jsonResult).length}.`);
                    let allItemsArray = Object.keys(jsonResult).map((key, index) => {
                        return {
                            game_id: gameid,
                            market_hash_name: key,
                            price: jsonResult[key]
                        }
                    });
                    let bulk = Price.collection.initializeUnorderedBulkOp();

                    allItemsArray.forEach(item => {
                        bulk.find({market_hash_name: item.market_hash_name})
                            .upsert().updateOne(item);
                    });
                    bulk.execute((err, bulkers) => {
                        if (err) {
                            return logger.error(`Error bulking: ${e}`);
                        }
                        logger.info(`Updated Items for ${gameid}`)
                    });

                    // Price.insertMany(allItemsArray).then(docs => {
                    //     logger.info(`Saved docs for ${gameid}`)
                    // }, (e) => {
                    //     logger.error(`Error saving docs.`);
                    // });

                }
            }
            if (cb && typeof cb == 'function') {
                cb();
            }
        })
    });

}

现在请注意批量变量(感谢@Rahul),但现在,集合需要很长时间才能更新。我的处理器快耗尽了,更新 60k+ 文档实际上需要 3 分钟以上。老实说,我觉得像以前的方法,虽然它可能会删除所有这些然后重新插入它们,但它也需要 10 倍的速度。

有人吗?

最佳答案

根据我的经验(每小时更新数百万个 mongo 文档),这是一个非常大的批量更新的现实方法:

  • 单独执行所有 API 调用并将结果作为 bson 写入文件
  • 调用 mongoimport 并将该 bson 文件导入到一个新的空集合 prices_new 中。 Javascript,更不用说高级 OO 包装器,都太慢了
  • 重命名 prices_new -> prices dropTarget=true(这将是原子的,因此没有停机时间)

从原理上讲,它在 JS 中看起来像这样

let fname = '/tmp/data.bson';
let apiUrls = [...];

async function doRequest(url) {
    // perform a request and return an array of records
}

let responses  = await Promise.all(apiUrls.map(doRequest));

// if the data too big to fit in memory, use streams instead of this:

let data = flatMap(responses, BSON.serialize).join('\n'));
await fs.writeFile(fname, data);

await child_process.exec(`mongoimport --collection prices_new --drop ${fname}`);

await db.prices_new.renameCollection('prices', true);

关于javascript - 以最有效的方式更新许多(100k+)文档 MongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48852556/

相关文章:

javascript - AngularJS ng 单击多次调用

javascript - 为什么 Angularjs 不将 POST 发送到我的 Express 服务器?

node.js - 如何在 node.js 中将数组值作为可读流发出/传输?

c# - 如何避免使用 C# 在 MongoDB 中重复插入而不使用 "_id"键?

mongodb - 如何在mongodb中更新数组的子数组字段

javascript - 将 Doc 属性作为参数传递给 Mongodb 上的函数

javascript - WordPress 插件错误。备份伙伴

javascript - 循环遍历 HTML 元素

javascript - 我们可以在 Cordova 中使用 Twilio Javascript 客户端切换前后摄像头吗

javascript - 自写Node-RED Node 中require(<Module>)的问题