java - 在Java中应用MapReduce

标签 java mapreduce set tuples java-stream

我对于流、mapreducefilter 来说还是个初学者。

我从 Cassandra 表中获取行列表,其中包含三个字段 vehicleTypenoOfVehiclestaxPerPspecialVehicleType

我想准备一组这 3 个三元组,以便添加任何特定类型的车辆数量,同时三元组还应包含特定车辆类型税收的算术平均值。

我正在应用我的映射,例如:

session.execute(statement).all().stream()
            .map(row -> new ImmutablePair<>(row.getString("vehicleType"), new ImmutablePair<>(row.getInt("noOfVehicles"), row.getFloat("tollTaxOfParticularType") * row.getInt("noOfVehicles"))))
            .reduce(x->{

            });

并且我无法应用减少,因此应将其添加到以下集合中:

Set<Triple<String,Integer,Double>> set = new HashSet<>();

我举了一个例子来说明我想通过 Map-Reduce 实现什么:

我正在映射来 self 的表的三个字段(vehicleType、noOfVehicle、taxOfPspecialVehicle),例如:

(vehicleType,(noOfVehicle,noOfVehicle*taxOfParticularVehicle))

假设映射给了我一个像这样的数组:

[("A",(12,48)),("A",(10,30)),("B",(3,30)),("B",(4,70))]

最后我想将其减少到以下集合:

[("A",22,39),("B",7,50)]

这样就可以对 noOfVehicles 进行求和,而税收是该组中车辆税的算术平均值。

最佳答案

如果不进行多次流传输或在外部维护可变状态,这有点棘手。这些方法的最干净的替代方案似乎是编写自定义 Collector .

我对 Pair 不太满意, Triple以及什么不是,所以我使用具体类来进行说明: Data是单个数据点的持有者,对应于您的三重数据。

static final class Data {
    final String type;
    final int noOfVehicles;
    final double totalTax;
    Data(String type, int noOfVehicles, double totalTax) {
        this.type = type;
        this.noOfVehicles = noOfVehicles;
        this.totalTax = totalTax;
    }
}

接下来,我们需要一个在可变归约期间保存状态的辅助类,我将其称为 Stats :

static final class Stats {
    int noOfVehiclesSum;
    double totalTaxSum;
    int count;

    @Override
    public String toString() {
        return "Stats{" + "noOfVehiclesSum=" + noOfVehiclesSum +
               ", averageTax=" + (totalTaxSum / count) + '}';
    }
}

让我们创建一个测试数据列表

List<Data> l = Arrays.asList(new Data("A", 12, 48.0),
                             new Data("A", 10, 30.0),
                             new Data("B", 3 , 30.0),
                             new Data("B", 4 , 70.0),
                             new Data("B", 5 , 20.0));

我想要的最终减少结果是 Map<String, Stats>包含从车辆类型到 Stats 的映射该类型的对象(包含该类型的车辆数量和平均税收总和)。

在此示例中:{A=Stats{noOfVehiclesSum=22, averageTax=39.0}, B=Stats{noOfVehiclesSum=12, averageTax=40.0}}

我不知道有什么比编写您自己的定制更好的解决方案 Collector在这个例子中,看起来有点像下面这样:

static class StatsCollector implements Collector<Data, Stats, Stats> {
    @Override
    public Supplier<Stats> supplier() {
        return Stats::new;
    }

    @Override
    public BiConsumer<Stats, Data> accumulator() {
        return (stats, data) -> {
            stats.noOfVehiclesSum += data.noOfVehicles;
            stats.totalTaxSum += data.totalTax;
            stats.count += 1;
        };
    }

    @Override
    public BinaryOperator<Stats> combiner() {
        return (lft, rght) -> {
            lft.noOfVehiclesSum += rght.noOfVehiclesSum;
            lft.totalTaxSum += rght.totalTaxSum;
            lft.count += rght.count;
            return lft;
        };
    }

    @Override
    public Function<Stats, Stats> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.of(Collector.Characteristics.IDENTITY_FINISH);
    }
}

最后,完成所有这些管道之后,您将能够编写

Map<String, Stats> result = l.stream()
                             .collect(Collectors.groupingBy(data -> data.type,
                                                            new StatsCollector()));

并获得所需的映射。

关于java - 在Java中应用MapReduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47620042/

相关文章:

java - 使用java的网格程序

java - 映射过程数据在hadoop中可能为空

set - perl 6 集合操作中用户定义的比较函数

java - 关闭 Japplet 并不会结束 java 进程

java - 使用打印格式均匀间隔一位或多位数字的整数显示

hadoop - 字符串连接在 pig 中不起作用

ios - 如何在 IoS Swift 中比较 Set<String> 和 String?

python - 打印 pandas 中 2 个数据帧中所有出现的映射数据

java - Spring集成MessageQueue无需轮询

sorting - 在具有 "X"个映射器和 "Y"个缩减器的大型 MapReduce 作业中,排序/洗牌阶段将有多少个不同的复制操作