java - 如何在Spark 2.3.1中使用map和reduce函数进行分组和计数

标签 java apache-spark mapreduce dataset

我是一个新的 Spark 蜜蜂,我正在尝试使用以下 Spark 函数执行分组和计数:

 Dataset<Row> result =  dataset
       .groupBy("column1", "column2")
       .count();

但我读过here使用 group by 并不是一个好主意,因为它没有组合器,这反过来会影响 Spark 作业的运行时效率。 相反,应该使用reduceByKey函数进行聚合操作。

所以我尝试使用 reduceByKey功能,但不适用于 dataset 。相反,数据集使用 reduce(ReduceFunction<Row> func) .

由于我找不到使用reduce函数执行分组和计数的示例,因此我尝试将其转换为JavaRDD并使用reduceByKey :

//map each row to 1 and then group them by key 
JavaPairRDD<String[], Integer> mapOnes;
            try {
                 mapOnes = dailySummary.javaRDD().mapToPair(
                        new PairFunction<Row, String[], Integer>() {
                            @Override
                            public Tuple2<String[], Integer> call(Row t) throws Exception {
                                return new Tuple2<String[], Integer>(new String[]{t.getAs("column1"), t.getAs("column2")}, 1);
                            }   
                });
            }catch(Exception e) {
                log.error("exception in mapping ones: "+e);
                throw new Exception();
            }


        JavaPairRDD<String[], Integer> rowCount;
        try {
            rowCount = mapOnes.reduceByKey(
                new Function2<Integer, Integer, Integer>() {

                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1+v2;
                    }
                });
        }catch(Exception e) {
            log.error("exception in reduce by key: "+e);
            throw new Exception();
        }

但这也给出了异常(exception),如 org.apache.spark.SparkException: Task not serializable对于 mapToPair功能。

任何人都可以建议一种使用数据集的 reduce 进行分组和执行计数的更好方法吗?和map功能。

感谢任何帮助。

最佳答案

您添加的链接中的groupBy指的是RDD。在 RDD 语义中,groupBy 基本上会根据键对所有数据进行混洗,即将与键相关的所有值带到一处。

这就是为什么建议使用reduceByKey,因为reduceByKey首先在每个分区上执行reduce操作,并且仅对减少的值进行混洗,这意味着流量会减少很多(并防止将所有内容都转移到一个分区时出现内存不足问题)。

在数据集中,groupBy 的行为有所不同。它不提供数据集作为返回对象,而是提供 KeyValueGroupedDataset 对象。当您确实依赖此对象(或更通用的 agg)时,它基本上定义了一个与reduceByKey 非常相似的reducer。

这意味着不需要单独的reduceByKey方法(数据集groupby实际上是reduceByKey的一种形式)。

坚持原来的groupBy(...).count(...)

关于java - 如何在Spark 2.3.1中使用map和reduce函数进行分组和计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51849735/

相关文章:

java - 如何在 Hadoop map/reduce 的映射器中写入多个文件?

java - 具有相对路径的 JSF 导航规则

hadoop - hadoop中的错误: “Exception in thread ” main“java.lang.ClassNotFoundException”

java - 32 位整数的二进制表示

Java、Spark、Sql2o、H2 : Could not acquire a connection from DataSource - IO Exception

apache-spark - 从 Spark 数据框中选择不同值的最有效方法是什么?

python - 将数据框转换为 JSON(在 pyspark 中),然后选择所需的字段

java - OOZIE HIVE操作-工作流.xml属性不会传递给子任务

java - 有没有办法使用 jpackage 更改安装程序文件的图标?

Java:如何在不迭代的情况下从 List<T> 转换为 Map<f1(T), List(f2(T))>