java - Spark 中聚合键、值对

标签 java apache-spark aggregate

我有一个具有以下格式的 Spark 数据集:

c1    c2
--------
a     d
a     c
a     d
b     e
b     d

我想做的是得到类似下面的东西(x按计数排序)。

c1           x
-----------------------
a     [(d, 2), (c, 1)]
b     [(e, 1), (d, 1)]

我能得到

c1     c2     count
----------------------
a      d      2
a      c      1
b      e      1
b      d      1

通过 df.groupBy(c1, c2).count() ,但不知道如何从那里开始。 我还探索过df.groupBy(c1).agg(collect_list(c2)) ,这给了我

a     [d,c,d]
b     [e,d]

这并不完全是我所需要的,据我了解,扩展到大型数据集时会出现问题。

有什么帮助吗?

最佳答案

这只是一个草稿,因为我没有时间测试它,但你应该有一个想法:

//you initial data
Dataset initialData;
//map values to has set with counts initiali 1 for every element
initialData.map((key, value) => {
  Map<Character, Integer> res = new HashMap<>();
  res.add(value, 1);
  Tuple2<Character, Map<Character, Integer>> tuple = new Tuple2<>(key, res);
  return tuple;
//count elements for given key
}).reduceByKey((map1, map2) => {
  final Map<Character, Integer> res = new HashMap<>();
  res.putAll(map1);
  map2.foreach((key, value) => {
    if (res.contains(key))
      res.put(key, map2(key) + value);
    else
      res.put(key, map2(key));
  }
  return res;
//sort values by count
}.map((key, value) => {
  List<SetEntry<Character, Integer>> entryList = new ArrayList<>(value.entrySet());

  collection.sort(entryList, (entry1, entry2) => 
    entry1.getValue() < entry2.getValue());
  return new Tuple2<>(key, entryList);
}

关于java - Spark 中聚合键、值对,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48874770/

相关文章:

r - 为data.frame中的多个变量按组计算平均值和标准差

java - Oozie Java 操作 : Passing Hbase classpath

java - java中的外推

java - 并行的 Selenium WebDriver - 关闭 WebDriver 实例会中断其他测试

apache-spark - 如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以回读

python-3.x - 如何使用 Python Pandas 将列的日期聚合到每个人的日期列表中?

java - 如何确保在 Java 中销毁 String 对象?

java - 如何将 Spark Row 的数据集转换成字符串?

java - 在 RDD 中存储数组的有效方法

xml - 使用 XSLT 对简单 XML 应用 Muenchian 分组