mongodb - mongoDB 中具有时间间隔的增量 MapReduce

标签 mongodb mongoid mongoose mongodb-.net-driver mongomapper

我以 10 分钟的时间间隔从服务器获取了一些记录(在 1 小时内我将获取 6 个文件) 我想在接下来的几个小时内每 1 小时做一次 map reduce 我将不得不对 6 个文件和最后几个小时的文件进行下一组的 map reduce 我将如何解决这个问题? 帮我 我混淆了过去 1 个月 谢谢 苏希尔·克尔·辛格

最佳答案

为了按小时汇总您的 10 分钟日志文件,您可以在 map 函数中将每个日志文件的时间戳舍入到最接近的小时,并在 reduce 函数中按小时对结果进行分组。

这是一个小的虚拟示例,它从 mongo shell 中说明了这一点:

  1. 创建 100 个日志文件,每个间隔 10 分钟并包含一个 0-10 之间的随机数,并将它们插入数据库的 logs 集合中:

    for (var i = 0; i < 100; i++) { 
        d = new ISODate();
        d.setMinutes(d.getMinutes() + i*10);
        r = Math.floor(Math.random()*11)
        db.logs.insert({timestamp: d, number: r})
    }
    

    要检查 logs 集合是什么样的,请发送类似 db.logs.find().limit(3).pretty() 的查询,结果为:

    {
        "_id" : ObjectId("50455a3570537f9433c1efb2"),
        "timestamp" : ISODate("2012-09-04T01:32:37.370Z"),
        "number" : 2
    }
    {
        "_id" : ObjectId("50455a3570537f9433c1efb3"),
        "timestamp" : ISODate("2012-09-04T01:42:37.370Z"),
        "number" : 3
    }
    {
        "_id" : ObjectId("50455a3570537f9433c1efb4"),
        "timestamp" : ISODate("2012-09-04T01:52:37.370Z"),
        "number" : 8
    }
    
  2. 定义一个 map 函数(在本例中称为 mapf),将时间戳四舍五入到最接近的小时(向下舍入),用于 emit 键。 emit 值是该日志文件的编号。

    mapf = function () { 
        // round down to nearest hour
        d = this.timestamp;
        d.setMinutes(0);
        d.setSeconds(0);
        d.setMilliseconds(0);
        emit(d, this.number); 
    }
    
  3. 定义一个 reduce 函数,对所有发出的值(即数字)求和。

    reducef = function (key, values) {
        var sum = 0;
        for (var v in values) {
            sum += values[v];
        }
        return sum;
    }
    
  4. 现在对日志集合执行 map/reduce。此处的 out 参数指定我们要将结果写入 hourly_logs 集合并将现有文档与新结果合并。这确保稍后提交的日志文件(例如,在服务器故障或其他延迟之后)一旦出现在日志中,就会包含在结果中。

    db.logs.mapReduce(mapf, reducef, {out: { merge : "hourly_logs" }})
    
  5. 最后,要查看结果,您可以在 hourly_logs 上查询一个简单的查找:

    db.hourly_logs.find()
    
    { "_id" : ISODate("2012-09-04T02:00:00Z"), "value" : 33 }
    { "_id" : ISODate("2012-09-04T03:00:00Z"), "value" : 31 }
    { "_id" : ISODate("2012-09-04T04:00:00Z"), "value" : 21 }
    { "_id" : ISODate("2012-09-04T05:00:00Z"), "value" : 40 }
    { "_id" : ISODate("2012-09-04T06:00:00Z"), "value" : 26 }
    { "_id" : ISODate("2012-09-04T07:00:00Z"), "value" : 26 }
    { "_id" : ISODate("2012-09-04T08:00:00Z"), "value" : 25 }
    { "_id" : ISODate("2012-09-04T09:00:00Z"), "value" : 46 }
    { "_id" : ISODate("2012-09-04T10:00:00Z"), "value" : 27 }
    { "_id" : ISODate("2012-09-04T11:00:00Z"), "value" : 42 }
    { "_id" : ISODate("2012-09-04T12:00:00Z"), "value" : 43 }
    { "_id" : ISODate("2012-09-04T13:00:00Z"), "value" : 35 }
    { "_id" : ISODate("2012-09-04T14:00:00Z"), "value" : 22 }
    { "_id" : ISODate("2012-09-04T15:00:00Z"), "value" : 34 }
    { "_id" : ISODate("2012-09-04T16:00:00Z"), "value" : 18 }
    { "_id" : ISODate("2012-09-04T01:00:00Z"), "value" : 13 }
    { "_id" : ISODate("2012-09-04T17:00:00Z"), "value" : 25 }
    { "_id" : ISODate("2012-09-04T18:00:00Z"), "value" : 7 }
    

结果是 10 分钟日志的每小时摘要,其中 _id 字段包含该小时的开始,而 value 字段包含随机数的总和。在您的情况下,您可能有不同的聚合运算符;根据您的需要修改 reduce 函数。

正如 Sammaye 在评论中提到的,您可以使用每小时运行的 cron 作业条目自动执行 map/reduce 调用。

如果您不想每次都处理完整的日志收集,您可以通过将文档限制为每小时时间窗口来运行增量更新,如下所示:

var q = { $and: [ {timestamp: {$gte: new Date(2012, 8, 4, 12, 0, 0) }},                                                  
                  {timestamp: {$lt:  new Date(2012, 8, 4, 13, 0, 0) }} ] }

db.logs.mapReduce(mapf, reducef, {query: q, out: { merge : "hourly_logs" }})

这将只包括 12 点到 13 点之间的日志文件。请注意 Date() 对象中的月份值从 0 开始(8 = 九月)。由于 merge 选项,在已处理的日志文件上运行 m/r 是安全的。

关于mongodb - mongoDB 中具有时间间隔的增量 MapReduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12253205/

相关文章:

javascript - 查询指针数组 Mongoose Node.js

javascript - Mongoose - exec 函数有什么作用?

node.js - 如何通过 $lookup 对 'joined' 集合执行 $text 搜索?

ruby-on-rails - 如何在 Mongoid 中构建复杂的作用域

arrays - Mongodb 对聚合方面的两个相似数据数组进行分组和求和

ruby-on-rails - 使用 mongolab 的 Heroku 的 mongodb.yml

ruby-on-rails - MongoDB ruby 日期

javascript - 如何使用一组 ID 仅查找集合中的最新文档

python - 使用 PyMongo 的自签名 SSL 连接

django - Pymongo 查询正常,但 Mongoengine 什么也不查询