apache-spark - Apache Spark :Why reduceByKey transformation executes the DAG?

标签 apache-spark

我正面临一个奇怪的问题。据我了解,Spark 中操作的 DAG 仅在执行操作时执行。但是,我可以看到 reduceByKey() 操作(作为一种转换)开始执行 DAG。

重现步骤 。尝试以下代码

SparkConf conf =new SparkConf().setMaster("local").setAppName("Test");
JavaSparkContext context=new JavaSparkContext(conf);

JavaRDD<String> textFile = context.textFile("any non-existing path"); // This path should not exist

JavaRDD<String> flatMap = textFile.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(x -> new Tuple2<String, Integer>((String) x, 1));

注意:文件的路径不应是任何现有路径。换句话说,文件不应该存在。

如果您执行此代码,则不会按预期发生任何事情。但是,如果您将以下行添加到程序中并执行
mapToPair.reduceByKey((x, y) -> x + y);

它给出了以下异常:
Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:

这意味着它已经开始执行 DAG。由于reduceByKey() 是一种转换,因此在执行collect() 或take() 等操作之前不应该是这种情况。

Spark 版本:2.0.0。请提供您的建议。

最佳答案

这是因为,真正执行的并不是 DAG(例如:它的整个实现)。

发生的情况是 reduceByKey 需要一个 Partitioner 才能工作。如果您不提供,Spark 会根据约定和默认值创建一个。代码中的“默认partiionner”如下注释:

/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If any of the RDDs already has a partitioner, choose that one.
*
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
* spark.default.parallelism is set, then we'll use the value from SparkContext
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
*
* Unless spark.default.parallelism is set, the number of partitions will be the
* same as the number of partitions in the largest upstream RDD, as this should
* be least likely to cause out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/

这个定义意味着,在某些情况下,计算来自所有上游 RDD 的分区数。在您的情况下,这意味着要求“文件系统”(可以是 Hadoop,也可以是本地的,...)执行任何必要的操作(例如,对 Hadoop 文件系统的一次调用可以返回多个文件,每个文件也可以拆分根据其 InputFormat 定义的各种优化,只有通过实际查找才能知道)。

所以这就是这里执行的内容,而不是实际的 DAG(例如;您的 map/flatMap/aggregate,...)。

您可以通过在这个 reduce by key 变体中提供您自己的分区器来避免它:
 reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

关于apache-spark - Apache Spark :Why reduceByKey transformation executes the DAG?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42084703/

相关文章:

apache-spark - 通过在 Java 流中使用 SparkSession 在 Spark 中应用自定义函数进行分组?

scala - 过滤字符串上的 Spark DataFrame 包含

apache-spark - 非 Databricks 平台上的 Spark Delta 格式

scala - 在 Spark scala 中合并

apache-spark - 如何按列的值分组处理 Spark DataFrame

apache-spark - JTS 拓扑套件中的 STRtree : bulk load data and build index

apache-spark - Pyspark 数据框中的 regexp_replace

apache-spark - 使用 kubernetes 在 spark 2.3 中处理 spark-submit 的远程依赖项

apache-spark - 无法使用 PySpark 从 Elasticsearch 读取

apache-spark - Spark groupBy vs repartition 加 mapPartitions