scala - 修改了spark中的countByKey

标签 scala apache-spark apache-spark-sql

我有一个数据框,如下所示:

+------+-------+
| key  | label |
+------+-------+
| key1 | a     |
| key1 | b     |
| key2 | a     |
| key2 | a     |
| key2 | a     |
+------+-------+


我想要在spark中更改countByKeys的版本,该版本返回如下输出:

+------+-------+
| key  | count |
+------+-------+
| key1 |     0 |
| key2 |     3 |
+------+-------+
//explanation: 
if all labels under a key are same, then return count of all rows under a key 
else count for that key is 0


我解决这个问题的方法:

脚步:


reduceByKey():连接所有标签(将标签视为字符串)以获取类型的数据框
< key,concat_of_all_labels >
mapValues():按字符解析每个字符串以检查是否都相同。如果它们的返回标签数相同,则返回0。


我刚起步,我觉得应该有一些有效的方法来完成此任务。有没有更好的方法来完成此任务?

最佳答案

这非常简单:通过键同时获得计数和非重复计数,然后...然后...

val df = Seq(("key1", "a"), ("key1", "b"), ("key2", "a"), ("key2", "a"), ("key2", "a")).toDF("key", "label")
df.groupBy('key)
  .agg(countDistinct('label).as("cntDistinct"), count('label).as("cnt"))
  .select('key, when('cntDistinct === 1, 'cnt).otherwise(typedLit(0)).as("count"))
  .show

+----+-----+
| key|count|
+----+-----+
|key1|    0|
|key2|    3|
+----+-----+

关于scala - 修改了spark中的countByKey,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55666759/

相关文章:

scala - 如何从年月日分区列的列表中提取最新/最近的分区

sql - 从 Hive 中的多个表中选择增量数据

scala - 在 EMR 笔记本上安装其他 scala 库

scala - 使用Intellij在Scala中查找未使用的方法

python - 如何重新格式化 Spark Python 输出

scala - Spark streaming 和 Dstream 如何工作?

scala - Spark : Replace Null value in a Nested column

java - 监视线程状态的规范方法

scala - 如何在 Scala 中使用否定类型?

hadoop - 错误 TableInputFormat : Java. lang.NullPointerException 在 org.Apache.Hadoop.hbase.TableName.valueOf