我想了解如何使用MapReduce处理日志文件。
例如,如果我有这样的文件传输日志:
Start_Datestamp,file_name,source, host,file_size,transfered_size
2012-11-18 T 16:05:00.000, FileA,SourceA, HostA,1Gb, 500Mb
2012-11-18 T 16:25:00.000, FileA,SourceA, HostB,1Gb, 500Mb
2012-11-18 T 16:33:00.000, FileB,SourceB, HostB,2Gb, 2GB
2012-11-18 T 17:07:00.000, FileC,SourceC, HostA,1Gb, 500Mb
2012-11-18 T 17:19:00.000, FileB,SourceC, HostA,1Gb, 500Mb
2012-11-18 T 17:23:00.000, FileA,SourceC, HostC,1Gb, 500Mb
我想像这样聚合和输出:
Start_Datestamp,file_name,source, Total_transfered_size
2012-11-18 T 16:00, FileA,SourceA, 1000Mb
2012-11-18 T 16:30, FileB,SourceB, 2GB
2012-11-18 T 17:00, FileC,SourceC,500Mb
2012-11-18 T 17:00, FileB,SourceC, 500Mb
2012-11-18 T 17:00, FileA,SourceC, 500Mb
如上所示,它应以30分钟为间隔汇总文件传输。
我设法使用下面的教程实现了30分钟的间隔聚合:
http://www.informit.com/articles/article.aspx?p=2017061
但这很简单:
Start_Datestamp,count
2012-11-18 T 16:00, 2
2012-11-18 T 16:30, 1
2012-11-18 T 17:00,3
但不确定如何使用其他字段。我尝试使用WritableComparable创建复合键来组成Start_Datestamp,file_name,source,但是无法正常工作。有人可以指导我吗?
更新!
所以现在我设法使用Sudarshan的建议打印多个字段。但是,我遇到了另一个问题。
例如,让我们看一下上表中的样本数据:
Start_Datestamp,file_name,source, host,file_size,transfered_size
2012-11-18 T 16:05:00.000, FileA,SourceA, HostA,1Gb, 500Mb
2012-11-18 T 16:25:00.000, FileA,SourceA, HostB,1Gb, 500Mb
2012-11-18 T 16:28:00.000, FileA,SourceB, HostB,1Gb, 500Mb
我想做的是按时间戳将数据分组,间隔为30分钟,来源,sum(transfered_size)
所以它是这样的:
Start_Datestamp,source, Total_transfered_size
2012-11-18 T 16:00,SourceA, 1000Mb <<==Please see those two records are now merged to '16:00' timestamp .
2012-11-18 T 16:00,SourceB, HostB,1Gb, 500Mb <<===this record should not be merged because different source, even though the timetamp is within '16:00' frame.
但是在我的情况下,发生的是每个时间间隔仅打印第一条记录
例如
Start_Datestamp,来源,Total_transfered_size
2012-11-18 T 16:00,SourceA,1000Mb << ==仅打印此记录。另一个不打印。
在我的Map类中,我添加了以下小节:
out = "," + src_loc + "," + dst_loc + "," + remote + ","
+ transfer + " " + activity + "," + read_bytes+ ","
+ write_bytes + "," + file_name + " "
+ total_time + "," + finished;
date.setDate(calendar.getTime());
output.collect(date, new Text(out));
然后在 reducer 中:
String newline = System.getProperty("line.separator");
while (values.hasNext()) {
out += values.next().toString() + newline;
}
output.collect(key, new Text(out));
我认为问题在于 reducer 迭代。
我尝试在while循环内移动以下代码,该循环似乎正在打印所有记录。但是我不确定这是否是正确的方法。任何建议将不胜感激。
output.collect(key, new Text(out));
最佳答案
您现在正沿着正确的道路前进,而不是在值中传递1。
custom_key将以30分钟为间隔的时间
output.collect(custom_key, one);
您可以传递整个日志文本。
output.collect(customkey, log_text);
然后,在化简器中,您将以可迭代的方式收到整个日志文本。在您的化简器中解析它并使用相关字段。
map<source,datatransferred>
for loop on iterable
parse log text line
extract file_name,source, Total_transffered_size
store the sum of data into the map against the source
end loop
for loop on map
output time,source,sum calculated in above step
end loop
答案有两个假设
关于hadoop - 如何使用MapReduce处理日志文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23689653/