javascript - 为数组的每个元素发出查询

标签 javascript node.js mongodb

我目前正在查询我的 mondo db 以获取一个集合中的一个 url 数组,该集合返回一个数组。然后,我想使用该数组遍历另一个集合,并在上一个查询的返回数组中找到每个元素的匹配元素。在数组上使用 forEach 并进行单独查询是否合适?
我的代码看起来像这样,第一个函数 getUrls 效果很好。我得到的当前错误是:

(node:10754) UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): TypeError: Cannot read property 'limit' of undefined (node:10754) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.


async function useUrls () {
    let domains = await getUrls()
    let db = await mongo.connect("mongodb://35.185.206.31:80/lc_data")
    let results = []
    domains.forEach( domain =>{ 
        let query = {"$match": 
                        {"email_domain": domain}
                    }
        let cursor = db.collection('circleback')
            .aggregate([query], (err, data) =>{
                if(err)
                    throw err;
                console.log("cb", data)
            }).limit(1100)
    })

最佳答案

如前所述,问题中的代码存在一些问题,其中大部分问题可以通过查看本回复末尾提供的完整示例列表来解决。您在这里本质上要求的是“Top-N results”问题的变体,有几种方法可以“实际”处理这个问题。

所以有点从“最差”到“最好”的排名:

聚合 $slice

因此,您可以交替提供 ,而不是“循环”您的函数结果。全部 使用 $in 查询的结果.这减轻了“循环输入”的需要,但这里需要的另一件事是“每个输出的前 N ​​个”。

MongoDB 中确实没有一个“稳定”的机制来解决这个问题,但是“如果”它在给定集合的大小上是合理的,那么你实际上可以简单地 $group 在与提供的 $in 匹配的“不同”键上参数,然后是 $push 所有文档放入一个数组和 $slice 结果:

let results = await db.collection('circleback').aggregate([
  { "$match": { "email_domain": { "$in": domains } } },
  { "$group": {
    "_id": "$email_domain",
    "docs": { "$push": "$$ROOT" }
  }},
  { "$sort": { "_id": 1 } },
  { "$addFields": { "docs": { "$slice": [ "$docs", 0, 1100 ] } } }
]).toArray();

这里“更广泛”的问题是 MongoDB 无法“限制”初始 $push 上的数组内容。 .而这实际上是一个长期悬而未决的问题。 SERVER-9377 .

因此,虽然我们可以“理论上”进行此类操作,但它通常根本不实用,因为 16MB BSON 限制通常会限制“初始”数组大小,即使 $slice 也是如此。结果确实会保持在该上限之下。

串行循环执行异步/等待

你的代码表明你是在这个环境下运行的,所以我建议你实际使用它。只需await在源的每次循环迭代中:
let results = [];
for ( let domain of domains ) {
  results = results.concat(
    await db.collection('circleback').find({ "email_domain": domain })
      .limit(1100).toArray()
  );
}

简单的函数允许您执行此操作,例如返回 .find() 的标准游标结果通过 .toArray() 作为数组然后使用 .concat()加入以前的结果数组。

它简单有效,但我们可能会做得更好

异步方法的并发执行

所以不要使用“循环”和await在每个调用的异步函数上,您可以改为同时执行它们(或至少“大多数”)。这实际上是您目前在问题中提出的问题的一部分,因为实际上没有什么“等待”循环迭代。

我们可以使用 Promise.all() 为了有效地做到这一点,但是如果它实际上是一个“非常大”数量的同时运行的 promise ,这将遇到与所经历的相同的问题,即超出调用堆栈。

为了避免这种情况,我们仍然可以使用 Bluebird Promise 和 Promise.map() 的好处。 .这有一个“并发限制器”选项,它只允许指定数量的操作同时执行:
let results = [].concat.apply([],
  await Promise.map(domains, domain => 
    db.collection('circleback').find({ "email_domain": domain })
      .limit(1100).toArray()
  ,{ concurrency: 10 })
);

事实上,您甚至应该能够使用 Bluebird promise “插入”.map() 之类的库。任何其他返回 Promise 的功能,例如您的“源”函数返回 "domains" 的列表.然后您可以像后面的示例中所示的那样“链接”。

future 的 MongoDB

MongoDB 的 future 版本(来自 MongoDB 3.6)实际上有一个新的“非相关”形式 $lookup 这允许这里有一个特殊情况。所以回到最初的聚合示例,我们可以得到每个匹配键的“不同”值,然后 $lookup "pipeline"然后将允许 $limit 的参数应用于结果。
let results = await db.collection('circleback').aggregate([
  { "$match": { "email_domain": { "$in": domains } } },
  { "$group": { "_id": "$email_domain"  }},
  { "$sort": { "_id": 1 } },
  { "$lookup": {
     "from": "circleback",
     "let": {
       "domain": "$_id"
     },
     "pipeline": [
       { "$redact": {
         "$cond": {
           "if": { "$eq": [ "$email_domain", "$$domain" ] },
           "then": "$$KEEP",
           "else": "$$PRUNE"
         }
       }},
       { "$limit": 1100 }
     ],
     "as": "docs"
  }}
]).toArray();

这将始终保持在 16MB BSON 限制之下,当然假设 $in 的参数允许这种情况。

示例 list

作为一个完整的示例 list ,您可以运行,并且通常使用默认数据集创建非常大。它演示了上述所有技术以及要遵循的一些一般使用模式。
const mongodb = require('mongodb'),
      Promise = require('bluebird'),
      MongoClient = mongodb.MongoClient,
      Logger = mongodb.Logger;

const uri = 'mongodb://localhost/bigpara';

function log(data) {
  console.log(JSON.stringify(data,undefined,2))
}

(async function() {

  let db;

  try {
    db = await MongoClient.connect(uri,{ promiseLibrary: Promise });

    Logger.setLevel('info');
    let source = db.collection('source');
    let data = db.collection('data');

    // Clean collections
    await Promise.all(
      [source,data].map( coll => coll.remove({}) )
    );

    // Create some data to work with

    await source.insertMany(
      Array.apply([],Array(500)).map((e,i) => ({ item: i+1 }))
    );

    let ops = [];
    for (let i=1; i <= 10000; i++) {
      ops.push({
        item: Math.floor(Math.random() * 500) + 1,
        index: i,
        amount: Math.floor(Math.random() * (200 - 100 + 1)) + 100
      });
      if ( i % 1000 === 0 ) {
        await data.insertMany(ops,{ ordered: false });
        ops = [];
      }
    }

    /* Fetch 5 and 5 example
     *
     * Note that the async method to supply to $in is a simulation
     * of any real source that is returning an array
     *
     * Not the best since it means ALL documents go into the array
     * for the selection. Then you $slice off only what you need.
     */
    console.log('\nAggregate $in Example');
    await (async function(source,data) {
      let results = await data.aggregate([
        { "$match": {
          "item": {
            "$in": (await source.find().limit(5).toArray()).map(d => d.item)
          }
        }},
        { "$group": {
          "_id": "$item",
          "docs": { "$push": "$$ROOT" }
        }},
        { "$addFields": {
          "docs": { "$slice": [ "$docs", 0, 5 ] }
        }},
        { "$sort": { "_id": 1 } }
      ]).toArray();
      log(results);
    })(source,data);


    /*
     * Fetch 10 by 2 example
     *
     * Much better usage of concurrent processes and only get's
     * what is needed. But it is actually 1 request per item
     */
    console.log('\nPromise.map concurrency example');
    await (async function(source,data) {
      let results = [].concat.apply([],
        await source.find().limit(10).toArray().map(d =>
          data.find({ item: d.item }).limit(2).toArray()
        ,{ concurrency: 5 })
      );
      log(results);
    })(source,data);

    /*
     * Plain loop async/await serial example
     *
     * Still one request per item, requests are serial
     * and therefore take longer to complete than concurrent
     */
    console.log('\nasync/await serial loop');
    await (async function(source,data) {
      let items = (await source.find().limit(10).toArray());
      let results = [];
      for ( item of items ) {
        results = results.concat(
          await data.find({ item: item.item }).limit(2).toArray()
        );
      }
      log(results);
    })(source,data);


    /*
     * Non-Correlated $lookup example
     *
     * Uses aggregate to get the "distinct" matching results and then does
     * a $lookup operation to retrive the matching documents to the
     * specified $limit
     *
     * Typically not as efficient as the concurrent example, but does
     * actually run completely on the server, and does not require
     * additional connections.
     *
     */

    let version = (await db.db('admin').command({'buildinfo': 1})).version;
    if ( version >= "3.5" ) {
      console.log('\nNon-Correlated $lookup example $limit')
      await (async function(source,data) {
        let items = (await source.find().limit(5).toArray()).map(d => d.item);

        let results = await data.aggregate([
          { "$match": { "item": { "$in": items } } },
          { "$group": { "_id": "$item" } },
          { "$sort": { "_id": 1 } },
          { "$lookup": {
            "from": "data",
            "let": {
              "itemId": "$_id",
            },
            "pipeline": [
              { "$redact": {
                "$cond": {
                  "if": { "$eq": [ "$item", "$$itemId" ] },
                  "then": "$$KEEP",
                  "else": "$$PRUNE"
                }
              }},
              { "$limit": 5 }
            ],
            "as": "docs"
          }}
        ]).toArray();
        log(results);

      })(source,data);
    } else {
      console.log('\nSkipped Non-Correlated $lookup demo');
    }

  } catch(e) {
    console.error(e);
  } finally {
    db.close();
  }

})();

关于javascript - 为数组的每个元素发出查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45383044/

相关文章:

node.js - TypeScript 与现有 Node js Express 项目集成

node.js - Mongoose 模式方法是 "not a function"

javascript - Mongoose 模型静态/方法不保存 "this"中的值

java - 找不到类 org.json.JSONArray 的编解码器

javascript - Google Analytics API 网站上的当前用户

javascript - 从 json 源导入图像 (create-react-app)

javascript - 在 Javascript 中获取本地 HTML 文件的 HTML 代码

mysql - 使用 ON DUPLICATE KEY 将列增加一定数量 MySQL NodeJS

javascript - NodeJS 初学者问题。 "Create a download for a user"并避免多个相同的服务器调用

javascript - VS代码: trigger organizeImports when git staging