mysql - 将 MySQL 查询转换为 mongoDB

标签 mysql mongodb mapreduce aggregation-framework

我已经开始学习 MongoDB,但遇到了一个问题。我有一个名为 server_logs 的集合。

它包含以下列(SOURCE_SERVER、SOURCE_PORT、DESTINATION_PORT、DESTINATION_SERVER、MBYTES)。

我需要 SOURCE_SERVER 以及传输到每个 SOURCE_SERVER 的 MBYTES 总量。(但还有一点是,如果存在任何 source_server 也存在于 target_server 中,那么它们的 MBYTES 也将添加到每个 SOURCE_SERVER 中)。

例如:我有下面的表结构

  SOURCE   S_PORT   DEST    D_PORT  MBYTES
1)server1   446    server2   555     10MB
2)server3   226    server1   666     2MB
3)server1   446    server3   226     5MB

我需要以下结果:

Server1  17MB
Server3  7MB

我在 mysql 中创建了一个查询,以根据传输到该 SOURCE 的数据的 MBYTES 计算到顶级 SOURCE。它工作正常,我通过此查询在 MYSQL 中获得了所需的结果。

SELECT SOURCE, DEST, sum( logs.MBYTES )+(
    SELECT SUM(log.MBYTES) as sum
    from logs as log
    where logs.DEST=log.SOURCE
) AS MBYTES

我想在 MongoDB 中进行此查询。请帮助..

提前致谢..

最佳答案

虽然这种“自连接”类型的查询对于您如何使用 MongoDB 执行此操作可能不会立即显而易见,但它可以使用聚合框架完成,但只需要稍微改变您的想法。

你的数据在 MongoDB 中以这种形式存在,它仍然非常像原始的 SQL 源:

{ 
    "source" : "server1",
    "s_port" : 446,
    "dest" : "server2", 
    "d_port" : 555, 
    "transferMB" : 10
},
{ 
    "source" : "server3",
    "s_port" : 226,
    "dest" : "server1",
    "d_port" : 666,
    "transferMB" : 2
},
{ 
    "source" : "server1",
    "s_port" : 446, 
    "dest" : "server3",
    "d_port" : 226,
    "transferMB" : 5
}

使用 MongoDB 2.6 之前的版本,您的查询将如下所示:

db.logs.aggregate([

    // Project a "type" tag in order to transform, then unwind
    { "$project": {
         "source": 1,
         "dest": 1,
         "transferMB": 1,
         "type": { "$cond": [ 1,[ "source", "dest" ],0] }
    }},
    { "$unwind": "$type" },

    // Map the "source" and "dest" servers onto the type, keep the source       
    { "$project": {
        "type": 1,
        "tag": { "$cond": [
            { "$eq": [ "$type", "source" ] },
            "$source",
            "$dest"
        ]},
        "mbytes": "$transferMB",
        "source": 1
    }},

    // Group for totals, keep an array of the "source" for each
    { "$group": {
        "_id": "$tag",
        "mbytes": { "$sum": "$mbytes" },
        "source": { "$addToSet": "$source" }
    }},


    // Unwind that array
    { "$unwind": "$source" },

    // Is our grouped tag one on the sources? Inner join simulate
    { "$project": {
        "mbytes": 1,
        "matched": { "$eq": [ "$source", "$_id" ] }
    }},

    // Filter the results that did not match
    { "$match": { "matched": true }},


    // Discard duplicates for each server tag
    { "$group": { 
        "_id": "$_id",
        "mbytes": { "$first": "$mbytes" }
    }}
])

对于 2.6 及更高版本,您可以获得一些额外的运算符来简化此操作,或者至少使用不同的运算符:

db.logs.aggregate([

    // Project a "type" tag in order to transform, then unwind
    { "$project": {
         "source": 1,
         "dest": 1,
         "transferMB": 1,
         "type": { "$literal": [ "source", "dest" ] }
    }},
    { "$unwind": "$type" },

    // Map the "source" and "dest" servers onto the type, keep the source       
    { "$project": {
        "type": 1,
        "tag": { "$cond": [
            { "$eq": [ "$type", "source" ] },
            "$source",
            "$dest"
        ]},
        "mbytes": "$transferMB",
        "source": 1
    }},

    // Group for totals, keep an array of the "source" for each
    { "$group": {
        "_id": "$tag",
        "mbytes": { "$sum": "$mbytes" },
        "source": { "$addToSet": "$source" }
    }},

    // Co-erce the server tag into an array ( of one element )
    { "$group": {
        "_id": "$_id",
        "mbytes": { "$first": "$mbytes" },
        "source": { "$first": "$source" },
        "tags": { "$push": "$_id" }
    }},

    // User set intersection to find common element count of arrays
    { "$project": {
       "mbytes": 1,
       "matched": { "$size": { 
           "$setIntersection": [
               "$source",
               "$tags"
           ]
       }}
    }},

    // Filter those that had nothing in common
    { "$match": { "matched": { "$gt": 0 } }},

    // Remove the un-required field
    { "$project": { "mbytes": 1 }}
])

两种形式都会产生结果:

{ "_id" : "server1", "mbytes" : 17 }
{ "_id" : "server3", "mbytes" : 7 }

两者的一般原则是,通过保留有效“源”服务器的列表,您可以“过滤”组合结果,以便只有那些被列为源的服务器才会记录其总传输量。

因此,您可以使用多种技术来“ reshape ”、“组合”和“过滤”您的文档以获得您想要的结果。

阅读更多关于 aggregation operators 的信息同样值得一看的介绍是 SQL to Aggregation mapping chart在文档中为您提供转换常见操作的一些想法。

连浏览在 Stack Overflow 上标记此处以查找一些有趣的转换操作。

关于mysql - 将 MySQL 查询转换为 mongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23442282/

相关文章:

hadoop - MapReduce内部加入?

php - PHP Mysql Codeigniter 应用程序中的 session 变量大小增加

mysql - 无法连接到 docker 镜像内的 MySQL

MYSQL 数据库未连接、Node.JS、NPM MYSQL

mongodb - 有没有办法使用 "update"函数在给定查询和排序选项的情况下更新 MongoDB 集合中的第一个文档?

hadoop - Job 实例在 Hadoop 的构造函数中获取作业列表吗?

mysql - 插入或更新行时更新 mysql 表

ruby-on-rails - Rails4 和 MongoMapper

python - 如何使用 python (pymongo) 将嵌套文档插入到 mongodb

hadoop - 具有 HAR 文件输入的 MapReduce 作业