python - 如何使用 Python 在 MapReduce 中的 reducer 中输出键值对,以便 1 小时内的时间结束?

标签 python hadoop mapreduce

我有一种情况需要处理一个非常大的文本文件,格式如下:

ID \t time \t duration \t Description \t status

我想利用 MapReduce 来帮助我处理这个文件。我知道 MapReduce 基于键值对工作。 Mapper 将输出键和一些值,而 MapReduce 将确保所有相同的键最终都在 1 个 reducer 中。

我想要在 reducer 中结束的是时间间隔在 1 小时以内的行。然后在 reducer 中,我想访问所有其他信息以及 ID、持续时间、状态来做其他事情。所以我猜想输出的值是一个列表还是什么?

我有一些 Python 代码来处理输入数据。映射器.py

#!/usr/bin/env python
import sys
import re
for line in sys.stdin:
   line=line.strip()
   portions=re.split(r'\t+',line)
   time=portions[1]
#output key,value by print to stdout for reducer.py to read in.

请注意,我的数据集中的时间已经是 POSIX 时间格式。

我如何在 Mapper 中输出键值对来做到这一点?

我对 MapReduce/Hadoop 还是很陌生,感谢所有帮助。提前致谢!

最佳答案

这是一个策略:

  • 从 Mapper:发出每条记录的三个副本并使用二次排序:

    ( (复合键), 值) =

    • ((消息的小时-一小时,当前消息的精确时间), message)
    • ((消息的时间,消息的精确时间),消息)
    • ((消息小时+一小时,消息精确时间),消息)

现在:你需要标准的二次排序:

  • 仅将 Key 的前半部分(消息的小时)设置为 Partitioner
  • 将GroupingComparator 设置为仅键的前半部分(消息的小时)
  • setSortingComparator 为(消息的小时数,消息的精确时间)

在 reducer 中:每个 reducer 组在消息的精确时间 +/- 60 到 120 分钟内接收所有消息。 reducer 按排序顺序查看所有“消息的精确时间”。因此,您可以在过去 60 分钟内在每个 reducer

中保留查看的所有消息的滑动窗口

注意 以上假设 60 分钟消息的数据可以容纳在单个 reducer 任务的内存中。否则,您将需要将数据写入磁盘作为窗口功能的一部分。

更新 OP 要求进一步澄清窗口,所以我们开始吧。

从 Mapper 发出的键的角度考虑:每个输入记录有三个键。现在在 Reducer 上,这意味着每个输入记录出现在三个不同的组中。这样做的原因是我们需要针对每个输入记录同时考虑超前和滞后记录。所以现在我们让每个组都可以访问所有输入记录,这些记录可能在最早记录的 60 分钟内,也可能在最新记录的 60 分钟内。由于记录按每小时最早的秒数分组:这意味着 -60(分钟)到 +120(最大)与属于给定小时组内的任何记录相比。

关于python - 如何使用 Python 在 MapReduce 中的 reducer 中输出键值对,以便 1 小时内的时间结束?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28400772/

相关文章:

python - 多次运行 Celery/Django 单个任务

sqlite - 如何将SQLite数据库导入Hadoop HDFS

fetcher#1 随机播放中的 Hadoop 错误

javascript - 从 Python 执行 Javascript

python - Django:构造表单而不验证字段?

python - 简单的Python输入错误

hadoop - HDInsight安装后未创建HadoopDashboard

hadoop - 重新执行如何成为容错的主要来源?

java - 列表索引超出 map 的范围以减少Java中的作业

hadoop - 增加hdfs java堆内存的正确方法