java - 级联 - 合并 2 个聚合

标签 java hadoop cascading

我有以下问题,我试图通过级联解决:我有结构为 o,a,f,i,c 的记录的 csv 文件

我需要按 o、a、f 聚合记录,并对每组 i 和 c 求和。

例如:

100,200,300,5,1

100,200,300,6,2

101,201,301,20,5

101,201,301,21,6

应该产生:

100,200,300,11,3

101,201,301,41,11

我不明白如何合并我拥有的 2 个 Every 实例(我可以同时聚合两个字段吗?)。

你有什么想法吗?

尤西

public class CascMain {

public static void main(String[] args){

    Scheme sourceScheme = new TextLine(new Fields("line"));
    Tap source = new Lfs(sourceScheme, "/tmp/casc/group.csv");

    Scheme sinkScheme = new TextDelimited(new Fields("o", "a", "f", "ti", "tc"), ",");
    Tap sink = new Lfs(sinkScheme, "/tmp/casc/output/", SinkMode.REPLACE);

    Pipe assembly = new Pipe("agg-pipe");

    Function function = new RegexSplitter(new Fields("o", "a", "f", "i", "c"), ",");
    assembly = new Each(assembly, new Fields("line"), function);

    Pipe groupAssembly = new GroupBy("group", assembly, new Fields("o", "a", "f"));

    Sum impSum = new Sum(new Fields("ti"));
    Pipe i = new Every(groupAssembly, new Fields("i"), impSum);

    Sum clickSum = new Sum(new Fields("tc"));
    Pipe c = new Every(groupAssembly, new Fields("c"), clickSum);

    // WHAT SHOULD I DO HERE

    Properties properties = new Properties();
    FlowConnector.setApplicationJarClass(properties, CascMain.class);

    FlowConnector flowConnector = new FlowConnector(properties);
    Flow flow = flowConnector.connect("agg", source, sink, assembly);
    flow.complete();

}

最佳答案

使用AggregateBy同时聚合多个字段:

SumBy impSum = new SumBy(new Fields("i"), new Fields("ti"), long.class);
SumBy clickSum = new SumBy(new Fields("c"), new Fields("tc"), long.class);
assembly = new AggregateBy("totals", Pipe.pipes(assembly), new Fields("o", "a", "f"), 2, impSum, clickSum);

关于java - 级联 - 合并 2 个聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7009193/

相关文章:

python - 使用 Hive 的 TRANSFORM 命令在 Hadoop 表中写入 python 脚本的输出

mapreduce - 在哪里可以找到 hbase-0.89.20100924+28 的 HBase 级联模块?

java - Commons配置库添加元素

java - 不理解二维数组的每个循环

java - 使用子字符串时出错 : String Index out of range : -1 *Java*

hadoop - 是否有连接 Cassandra 和 Hadoop 的权威指南?

hadoop - hadoop的classpath在哪里设置

java - 更换 fragment 时内存泄漏

hadoop - 添加对 scalding 的 parquet-avro 支持

hadoop - 级联Hadoop文件加载-处理跨越换行符的记录的方法?