我想同时对单个 RDD 执行 2 个操作。我写过这样的代码
val conf = new SparkConf().setAppName("Foo")
val sc = new SparkContext(conf)
val sqlSc = new SQLContext(sc)
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
val inputPath = path
val rdd = sc.textFile(inputPath).cache()
val f1 = Future {
val schama1 = StructType(List(StructField("a", StringType, true), StructField("b", StringType, true), StructField("c", LongType, true)))
val rdd1 = rdd.map(func1).filter(_.isDefined).flatMap(x => x)
val df1 = sqlSc.createDataFrame(rdd, schema)
formSubmissionDataFrame.save("/foo/", "com.databricks.spark.avro")
0
}
val f2 = Future {
val schema2 = StructType(List(StructField("d", StringType, true), StructField("e", StringType, true)))
val rdd2 = rdd.map(func2).filter(_.isDefined).flatMap(x => x)
val df2 = sqlSc.createDataFrame(rdd2, schema2)
pageViewDataFrame.save("/bar/", "com.databricks.spark.avro")
0
}
val result = for {
r1 <- f1
r2 <- f2
} yield(r1 + r2)
result onSuccess{
case r => println("done")
}
Await.result(result, Duration.Inf)
当我运行这段代码时,我没有看到预期的效果。目录 bar 有很多临时文件等...但是 foo 什么都没有...所以这两个数据集似乎不是并行创建的。
在 spark 驱动程序中使用 future 是个好主意吗?我做得对吗?我应该做些不同的事情吗?
最佳答案
为了并行执行两个或多个 Spark JOBS(操作),Spark Context 需要以 FAIR 调度程序模式运行。
在所有转换的驱动程序中,仅生成依赖图以供执行,但实际执行仅在调用操作时发生。通常,驱动程序会等待执行发生在由 Spark 从节点管理的节点之间。在您的情况下,Spark Master 在第一个作业结束之前不会开始执行第二个作业,因为默认情况下 Spark 调度是 FIFO。
你可以如下设置conf来开启并行执行
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
关于scala - 在 spark 作业中使用 Future,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35965835/