scala - Spark 2.0并行JobProgressListener惨败

标签 scala apache-spark parallel-processing apache-spark-sql scala-collections

我有一个场景,我需要使用 for 循环并行触发许多 sql 查询,并将结果列表收集到 ListBuffer 中。 但是,我在运行循环时遇到很多错误,并且结果不完整。为了举例说明,我做了一个非常简单的可重现示例:

import scala.collection.mutable.ListBuffer
val dummy = List("a","b").toDF.createOrReplaceTempView("df")
spark.catalog.cacheTable("df")
val dig = (0 to 9).par
var counter = 0:Int
var results = ListBuffer[List[org.apache.spark.sql.Row]]()

for (i1 <- dig ) {
   for (i2 <- dig ) {
     for (i3 <- dig ) {
        println("||==="+i1+"=="+i2+"=="+i3+"===="+(i1*100+i2*10+i3*1)+"===="+counter+"=======||")
        counter +=1
        results += spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"','"+(i1*100+i2*10+i3*1)+"','"+counter+"',*  from df ").collect().toList
       }
     }
   }
results(0).take(2).foreach(println)
results.size
results.flatten.size

上面的代码只是从 0 到 999 进行计数,每次计数都会将 2 行的列表插入到 ListBuffer 中。表以及用于比较的“串行”计数器值

运行代码结果:

||===9==8==3====983====969=======||
||===9==8==5====985====969=======||
||===9==8==1====981====969=======||
||===9==8==2====982====969=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 784
||===9==8==7====987====974=======||
||===5==8==9====589====975=======||
||===9==8==4====984====976=======||
||===9==8==6====986====976=======||
||===9==8==9====989====977=======||
||===9==8==8====988====977=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 773
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 790
||===5==9==0====590====980=======||
||===5==9==2====592====980=======||
||===5==9==5====595====980=======||
||===5==9==1====591====980=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 795
||===5==9==3====593====984=======||
||===5==9==7====597====985=======||
||===5==9==8====598====985=======||
||===5==9==6====596====987=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 798
||===5==9==9====599====988=======||
||===5==9==4====594====989=======||
||===9==9==0====990====990=======||
||===9==9==5====995====991=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 784
||===9==9==2====992====992=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 789
||===9==9==3====993====993=======||
||===9==9==1====991====994=======||
||===9==9==4====994====995=======||
||===9==9==7====997====996=======||
||===9==9==8====998====997=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 790
||===9==9==6====996====998=======||
||===9==9==9====999====999=======||
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 805
16/09/20 14:10:05 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 798

scala> results(0).take(2).foreach(println)
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 802
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 805
[trial,0,0,0,0,16,a]
[trial,0,0,0,0,16,b]

scala> results.size
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 839
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 840
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 839
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 842
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 855
res3: Int = 1000

scala> results.flatten.size
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 860
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 854
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 860
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 868
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 874
res4: Int = 2000
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 882

scala> 

[Stage 589:=(28 + 0) / 28][Stage 590:>(27 + 1) / 28][Stage 591:>(20 + 7) / 28]16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 888
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 895
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 898
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 898
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 905
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 906
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 907
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 902
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 905
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 913
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 915
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 916
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 913
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 920
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 942
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 946
16/09/20 14:10:06 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 942
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 946
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 948
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 956
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 952
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 965
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 965
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 966
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 976
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 976
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 990
16/09/20 14:10:07 WARN org.apache.spark.ui.jobs.JobProgressListener: Job completed for unknown job 999


scala> 

这些只是我收到的一些警告。

您可以看到计数器有时会“摇晃”

**这就是麻烦开始的地方**

有很多警告,但 results.size=1000results.flatten.size = 2000 符合预期。

但是尝试以同样的方式数到 10000 会导致更多警告:

import scala.collection.mutable.ListBuffer
val dummy = List("a","b").toDF.createOrReplaceTempView("df")
spark.catalog.cacheTable("df")
val dig = (0 to 9).par
var counter = 0:Int
var results = ListBuffer[List[org.apache.spark.sql.Row]]()

for (i1 <- dig ) {
   for (i2 <- dig ) {
     for (i3 <- dig ) {
       for (i4 <- dig ) {
         println("||==="+i1+"=="+i2+"=="+i3+"=="+i4+"===="+(i1*1000+i2*100+i3*10+i4*1)+"===="+counter+"=======||")
         counter +=1
         results += spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',*  from df ").collect().toList
       }
     }
   }
 }
results(0).take(2).foreach(println)
results.size
results.flatten.size

输出:

16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8797
||===0==9==4==3====943====9998=======||
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8799
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8801
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8802
||===0==9==4==4====944====9999=======||
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8803
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8804
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8805
16/09/20 14:18:24 WARN org.apache.spark.ui.jobs.JobProgressListener: Task start for unknown stage 8806

和结果:

scala> results(0).take(2).foreach(println)
[trial,3,0,0,0,3000,7,a]
[trial,3,0,0,0,3000,7,b]

scala> results.size
res3: Int = 9999

scala> results.flatten.size
res4: Int = 19998

缺少一个值。

我邀请您尝试以下代码,计数到 100000:

import scala.collection.mutable.ListBuffer
val dummy = List("a","b").toDF.createOrReplaceTempView("df")
spark.catalog.cacheTable("df")
val dig = (0 to 9).par
var counter = 0:Int
var results = ListBuffer[List[org.apache.spark.sql.Row]]()

for (i0 <- dig ) {
  for (i1 <- dig ) {
    for (i2 <- dig ) {
      for (i3 <- dig ) {
        for (i4 <- dig ) {
          println("============="+i0+"=="+i1+"=="+i2+"=="+i3+"=="+i4+"===="+(i0*10000+i1*1000+i2*100+i3*10+i4*1)+"===="+counter+"=========") 
          counter +=1
          results += spark.sql("select 'trial','"+i0+"','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i0*10000+i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',*  from df ").collect().toList
        }
      }
    }
  }
}

我不仅在运行期间收到大量JobProgressListener警告,而且结果不完整且不确定:

scala> results(0).take(2).foreach(println)
[trial,8,5,0,0,0,85000,13,a]
[trial,8,5,0,0,0,85000,13,b]

scala> results.size
res3: Int = 99999

scala> results.flatten.size
res4: Int = 192908

在我的现实生活示例中,我经常在运行的随机点出现“spark.sql.execution.id is already set”异常

我该如何解决这个问题?

我已经尝试过

spark.conf.set("spark.extraListeners","org.apache.spark.scheduler.StatsReportListener,org.apache.spark.scheduler.EventLoggingListener")

并阅读Spark 1.6: java.lang.IllegalArgumentException: spark.sql.execution.id is already set

Apache Spark: network errors between executors

http://docs.scala-lang.org/overviews/parallel-collections/overview.html关于副作用操作,但似乎有太多方向。

恕我直言,与此问题最相关的错误是 https://issues.apache.org/jira/browse/SPARK-10548 这应该在 Spark 1.6 中得到解决

有人可以提供一些解决这种情况的提示吗?我的现实案例的复杂性类似于 100000 计数,并且在随机阶段执行时失败。

我部署了 GCS dataproc 集群

gcloud dataproc 集群创建 clusTest --zone us-central1-b --master-machine-type n1-highmem-16 --num-workers 2 --worker-machine-type n1-highmem-8 - -num-worker-local-ssds 2 --num-preemptible-workers 8 --scopes 'https://www.googleapis.com/auth/cloud-platform' --project xyz-analytics

a screenshot of spark UI durning run

最佳答案

the results are incomplete and non-deterministic

非确定性部分应该给出提示。在将结果添加到 ListBuffer 中时,您会陷入竞争状态(并行更新并不是真正的线程安全,因此如果运行时间足够长,您最终会丢失一些结果。)

我在本地尝试过,可以重现这个结果不完整的问题。只需添加一个同步块(synchronized block)来附加到缓冲区即可使结果完整。您还可以为您的作业使用其他同步数据结构,因此您无需放置显式同步块(synchronized block),例如java.util.concurrent.ConcurrentLinkedQueue 或其他东西。

因此以下解决了这个问题:

for (i1 <- dig ) {
   for (i2 <- dig ) {
     for (i3 <- dig ) {
       for (i4 <- dig ) {
         counter +=1
         val result = spark.sql("select 'trial','"+i1+"','"+i2+"','"+i3+"', '"+i4+"','"+(i1*1000+i2*100+i3*10+i4*1)+"','"+counter+"',*  from df ").collect().toList
         synchronized {
           results += result
         }
       }
     }
   }
 }

至于“spark.sql.execution.id已设置”异常:我无法用上面给出的示例重现它。 (但是,我在本地 Spark 上运行上述代码。)它可以在本地设置上重现吗?

关于scala - Spark 2.0并行JobProgressListener惨败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39597332/

相关文章:

scala - Clojure 中的 mapcat 和 Scala 中的 flatMap 在操作方面有什么区别?

apache-spark - Spark 2.3 是否改变了它处理小文件的方式?

apache-spark - 使用 Window() 计算 PySpark 中数组的滚动总和?

go - 如何在 Golang 中实现适当的并行性? goroutines 在 1.5+ 版本中是并行的吗?

javascript - [JavaScript]有没有办法并行执行多个函数?

Scala:将隐式参数传递给返回另一个函数的函数

java - Java/Scala 中类似 Scipy 的功能?

json - spark 如何从 JSON 推断数值类型?

r - 并行错误 R : Error in serialize(data, node$con) :写入连接时出错

Scala:让特质依赖于其他特质