scala - 在 spark 作业中使用 Future

标签 scala apache-spark

我想同时对单个 RDD 执行 2 个操作。我写过这样的代码

val conf = new SparkConf().setAppName("Foo")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
val inputPath = path
val rdd = sc.textFile(inputPath).cache()

val f1 = Future {
  val schama1 = StructType(List(StructField("a", StringType, true), StructField("b", StringType, true), StructField("c", LongType, true)))
  val rdd1 = rdd.map(func1).filter(_.isDefined).flatMap(x => x)
  val df1 = sqlSc.createDataFrame(rdd, schema)
  formSubmissionDataFrame.save("/foo/", "com.databricks.spark.avro")
  0
}

val f2 = Future {
  val schema2 = StructType(List(StructField("d", StringType, true), StructField("e", StringType, true)))
  val rdd2 = rdd.map(func2).filter(_.isDefined).flatMap(x => x)
  val df2 = sqlSc.createDataFrame(rdd2, schema2)
  pageViewDataFrame.save("/bar/", "com.databricks.spark.avro")
  0
}

val result = for {
  r1 <- f1
  r2 <- f2
} yield(r1 + r2)

result onSuccess{
  case r => println("done")
}

Await.result(result, Duration.Inf)

当我运行这段代码时,我没有看到预期的效果。目录 bar 有很多临时文件等...但是 foo 什么都没有...所以这两个数据集似乎不是并行创建的。

在 spark 驱动程序中使用 future 是个好主意吗?我做得对吗?我应该做些不同的事情吗?

最佳答案

为了并行执行两个或多个 Spark JOBS(操作),Spark Context 需要以 FAIR 调度程序模式运行。

在所有转换的驱动程序中,仅生成依赖图以供执行,但实际执行仅在调用操作时发生。通常,驱动程序会等待执行发生在由 Spark 从节点管理的节点之间。在您的情况下,Spark Master 在第一个作业结束之前不会开始执行第二个作业,因为默认情况下 Spark 调度是 FIFO。

你可以如下设置conf来开启并行执行

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

详情请访问Spark Scheduling within an application

关于scala - 在 spark 作业中使用 Future,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35965835/

相关文章:

scala - 如何获取刚刚由 ScalaQuery 插入的自增 ID

scala - Spark 数据帧 : Extract a column based on the value of another column

Python 与 Scala(用于 Spark 作业)

dataframe - Spark : How to aggregate/reduce records based on time difference?

scala - Scala 中的构造函数局部变量

scala - JVM的NUMA意识

scala - 整数对的尾递归有界流(Scala)?

scala - 使用 circe 将 Map[String, MyCaseClass] 编码为 Seq[String, String]

python - Spark数据框将多行转换为列

amazon-ec2 - 如何充分利用集群中所有Spark节点?