hadoop - 如何使用MapReduce处理日志文件

标签 hadoop mapreduce bigdata

我想了解如何使用MapReduce处理日志文件。

例如,如果我有这样的文件传输日志:

Start_Datestamp,file_name,source, host,file_size,transfered_size
2012-11-18 T 16:05:00.000, FileA,SourceA, HostA,1Gb, 500Mb
2012-11-18 T 16:25:00.000, FileA,SourceA, HostB,1Gb, 500Mb

2012-11-18 T 16:33:00.000, FileB,SourceB, HostB,2Gb, 2GB

2012-11-18 T 17:07:00.000, FileC,SourceC, HostA,1Gb, 500Mb
2012-11-18 T 17:19:00.000, FileB,SourceC, HostA,1Gb, 500Mb
2012-11-18 T 17:23:00.000, FileA,SourceC, HostC,1Gb, 500Mb

我想像这样聚合和输出:
Start_Datestamp,file_name,source, Total_transfered_size
2012-11-18 T 16:00, FileA,SourceA, 1000Mb

2012-11-18 T 16:30, FileB,SourceB,  2GB

2012-11-18 T 17:00, FileC,SourceC,500Mb
2012-11-18 T 17:00, FileB,SourceC, 500Mb
2012-11-18 T 17:00, FileA,SourceC, 500Mb

如上所示,它应以30分钟为间隔汇总文件传输。

我设法使用下面的教程实现了30分钟的间隔聚合:
http://www.informit.com/articles/article.aspx?p=2017061

但这很简单:
Start_Datestamp,count
2012-11-18 T 16:00, 2
2012-11-18 T 16:30, 1
2012-11-18 T 17:00,3

但不确定如何使用其他字段。我尝试使用WritableComparable创建复合键来组成Start_Datestamp,file_name,source,但是无法正常工作。有人可以指导我吗?

更新!

所以现在我设法使用Sudarshan的建议打印多个字段。但是,我遇到了另一个问题。

例如,让我们看一下上表中的样本数据:
Start_Datestamp,file_name,source, host,file_size,transfered_size
2012-11-18 T 16:05:00.000, FileA,SourceA, HostA,1Gb, 500Mb
2012-11-18 T 16:25:00.000, FileA,SourceA, HostB,1Gb, 500Mb
2012-11-18 T 16:28:00.000, FileA,SourceB, HostB,1Gb, 500Mb

我想做的是按时间戳将数据分组,间隔为30分钟,来源,sum(transfered_size)

所以它是这样的:
Start_Datestamp,source, Total_transfered_size
2012-11-18 T 16:00,SourceA, 1000Mb <<==Please see those two records are now merged to '16:00' timestamp .
2012-11-18 T 16:00,SourceB, HostB,1Gb, 500Mb <<===this record should not be merged because different source, even though the timetamp is within '16:00' frame.

但是在我的情况下,发生的是每个时间间隔仅打印第一条记录

例如
Start_Datestamp,来源,Total_transfered_size
2012-11-18 T 16:00,SourceA,1000Mb << ==仅打印此记录。另一个不打印。

在我的Map类中,我添加了以下小节:
out = "," + src_loc + "," + dst_loc + "," + remote     + ","
+ transfer + " " + activity + ","     + read_bytes+ "," 
+ write_bytes + ","     + file_name + " " 
+ total_time + "," + finished;  

date.setDate(calendar.getTime()); 

output.collect(date, new Text(out));

然后在 reducer 中:
String newline = System.getProperty("line.separator");
   while (values.hasNext()) { 
out += values.next().toString() + newline;
}     

output.collect(key, new Text(out));

我认为问题在于 reducer 迭代。

我尝试在while循环内移动以下代码,该循环似乎正在打印所有记录。但是我不确定这是否是正确的方法。任何建议将不胜感激。
output.collect(key, new Text(out));

最佳答案

您现在正沿着正确的道路前进,而不是在值中传递1。
custom_key将以30分钟为间隔的时间

 output.collect(custom_key, one);

您可以传递整个日志文本。
output.collect(customkey, log_text);

然后,在化简器中,您将以可迭代的方式收到整个日志文本。在您的化简器中解析它并使用相关字段。
map<source,datatransferred>
for loop on iterable
   parse log text line
   extract file_name,source, Total_transffered_size
   store the sum of data into the map against the source
end loop

for loop on map
    output time,source,sum calculated in above step
end loop

答案有两个假设
  • 不要介意多个输出文件
  • 与输出
  • 的顺序无关

    关于hadoop - 如何使用MapReduce处理日志文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23689653/

    相关文章:

    python - 在 'from_delayed' JSON 文件中发现 DASK 元数据不匹配

    java - 无法从Hive更改表格位置

    java - Hadoop Mapreduce MultipleInputs 无法加载映射器类

    java - 象夫 : Cannot convert into sequence file

    java - 使用Java在HBase中检索第N个限定词

    hadoop - 在将文件提供给HDFS之前如何对其进行串联?

    hadoop - 使用命令行(CDH 5)启动Hadoop服务

    hadoop - 是否需要在jar文件中提供Oozie中的Java Action ?

    用于图像存储的 Hadoop (HDFS)

    json - HQL返回ISO时间戳