scala - Spark MapReduce 中的意外结果

标签 scala apache-spark mapreduce

我是 Spark 的新手,想了解 MapReduce 如何在幕后完成以确保我正确使用它。 This post提供了一个很好的答案,但我的结果似乎不符合所描述的逻辑。我正在运行 Spark Quick Start命令行中的 Scala 指南。当我正确地进行线长加法时,结果就好了。总行长为 1213:

scala> val textFile = sc.textFile("README.md")

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))

scala> val linesWithSparkLengths = linesWithSpark.map(s => s.length)

scala> linesWithSparkLengths.foreach(println)

Result:
14
78
73
42
68
17
62
45
76
64
54
74
84
29
136
77
77
73
70

scala> val totalLWSparkLength = linesWithSparkLengths.reduce((a,b) => a+b)
    totalLWSparkLength: Int = 1213

当我稍微调整它以使用 (a-b) 而不是 (a+b) 时,
scala> val totalLWSparkTest = linesWithSparkLengths.reduce((a,b) => a-b)

根据 this post 中的逻辑,我预计为 -1185 :
List(14,78,73,42,68,17,62,45,76,64,54,74,84,29,136,77,77,73,70).reduce( (x,y) => x - y )
  Step 1 : op( 14, 78 ) will be the first evaluation. 
     x is 14 and y is 78. Result of x - y = -64.
  Step 2:  op( op( 14, 78 ), 73 )
     x is op(14,78) = -64 and y = 73. Result of x - y = -137
  Step 3:  op( op( op( 14, 78 ), 73 ), 42) 
     x is op( op( 14, 78 ), 73 ) = -137 and y is 42. Result is -179.
  ...
  Step 18:  op( (... ), 73), 70) will be the final evaluation.
     x is -1115 and y is 70. Result of x - y is -1185.

然而,奇怪的事情发生了:
scala> val totalLWSparkTest = linesWithSparkLengths.reduce((a,b) => a-b)
totalLWSparkTest: Int = 151

当我再次运行它...
scala> val totalLWSparkTest = linesWithSparkLengths.reduce((a,b) => a-b)
totalLWSparkTest: Int = -151

谁能告诉我为什么结果是 151(或 -151)而不是 -1185?

最佳答案

这是因为减法既不是结合的也不是交换的。让我们从关联性开始:

(- (- (- 14 78) 73) 42) 
(- (- -64 73) 42)
(- -137 42) 
-179

不一样

(- (- 14 78) (- 73 42))
(- -64 (- 73 42))
(- -64 31)
-95

现在是交换性的时候了:

(- (- (- 14 78) 73) 42) ;; From the previous example

不一样

(- (- (- 42 73) 78) 14)
(- (- -31 78) 14)
(- -109 14)
-123

Spark先申请reduce在单个分区上,然后以任意顺序合并部分结果。如果您使用的函数不符合一项或两项标准,则最终结果可能是不确定的。

关于scala - Spark MapReduce 中的意外结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35190049/

相关文章:

c - 下面的 C 语言 MapReduce 代码有什么问题?

Hadoop - 在xml中增加 map task 不会在运行时增加 map task

hadoop - 将Pig脚本的输出存储在动态生成文件名的文件中

scala - 在 Spark 中写入和读取原始字节数组 - 使用序列文件 SequenceFile

scala - 如何使用Spark hadoopFile方法使用值类型为Text的自定义输入格式?

apache-spark - NoSuchMethodError 使用 Databricks Spark-Avro 3.2.0

python - Spark 和 Python : strategy for parallelize/map statsmodels sarimax

scala - 为什么与 tuple 类型的提取器一起使用的 for-comprehension 会导致 `filter` 上的编译警告?

amazon-web-services - Hadoop s3 配置文件丢失

scala - AnyRef.eq 方法的默认实现