我是一个新的 Spark 蜜蜂,我正在尝试使用以下 Spark 函数执行分组和计数:
Dataset<Row> result = dataset
.groupBy("column1", "column2")
.count();
但我读过here使用 group by 并不是一个好主意,因为它没有组合器,这反过来会影响 Spark 作业的运行时效率。 相反,应该使用reduceByKey函数进行聚合操作。
所以我尝试使用 reduceByKey
功能,但不适用于 dataset
。相反,数据集使用 reduce(ReduceFunction<Row> func)
.
由于我找不到使用reduce函数执行分组和计数的示例,因此我尝试将其转换为JavaRDD
并使用reduceByKey
:
//map each row to 1 and then group them by key
JavaPairRDD<String[], Integer> mapOnes;
try {
mapOnes = dailySummary.javaRDD().mapToPair(
new PairFunction<Row, String[], Integer>() {
@Override
public Tuple2<String[], Integer> call(Row t) throws Exception {
return new Tuple2<String[], Integer>(new String[]{t.getAs("column1"), t.getAs("column2")}, 1);
}
});
}catch(Exception e) {
log.error("exception in mapping ones: "+e);
throw new Exception();
}
JavaPairRDD<String[], Integer> rowCount;
try {
rowCount = mapOnes.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
}catch(Exception e) {
log.error("exception in reduce by key: "+e);
throw new Exception();
}
但这也给出了异常(exception),如 org.apache.spark.SparkException: Task not serializable
对于 mapToPair
功能。
任何人都可以建议一种使用数据集的 reduce
进行分组和执行计数的更好方法吗?和map
功能。
感谢任何帮助。
最佳答案
您添加的链接中的groupBy指的是RDD。在 RDD 语义中,groupBy 基本上会根据键对所有数据进行混洗,即将与键相关的所有值带到一处。
这就是为什么建议使用reduceByKey,因为reduceByKey首先在每个分区上执行reduce操作,并且仅对减少的值进行混洗,这意味着流量会减少很多(并防止将所有内容都转移到一个分区时出现内存不足问题)。
在数据集中,groupBy 的行为有所不同。它不提供数据集作为返回对象,而是提供 KeyValueGroupedDataset 对象。当您确实依赖此对象(或更通用的 agg)时,它基本上定义了一个与reduceByKey 非常相似的reducer。
这意味着不需要单独的reduceByKey方法(数据集groupby实际上是reduceByKey的一种形式)。
坚持原来的groupBy(...).count(...)
关于java - 如何在Spark 2.3.1中使用map和reduce函数进行分组和计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51849735/