我是一名 Spark Scala 程序员。我有一个 Spark 工作,其中包含完成整个工作的子任务。我想使用 Future
并行完成子任务。完成整个工作后,我必须返回整个工作响应。
我听说scala Future
是一旦主线程执行并停止,剩余的线程将被杀死,并且您将得到空响应。
我必须使用Await.result
来收集结果。但所有博客都告诉您应该避免 Await.result
,这是一种不好的做法。
在我的情况下,使用 Await.result
是否正确?
def computeParallel(): Future[String] = {
val f1 = Future { "ss" }
val f2 = Future { "sss" }
val f3 = Future { "ssss" }
for {
r1 <- f1
r2 <- f2
r3 <- f3
} yield (r1 + r2 + r3)
}
computeParallel().map(result => ???)
据我了解,我们必须在 Web 服务类型的应用程序中使用 Future
,其中有一个始终运行且不会退出的进程。但就我而言,一旦逻辑执行(scala 程序)完成,它将退出。
我可以使用 Future
来解决我的问题吗?
最佳答案
除特殊情况外,在 Spark 中使用 future 可能是不可取的,并且简单的并行计算也不是其中之一(为阻塞 I/O 提供非阻塞包装器(例如向外部服务发出请求)很可能是这样的)唯一的特殊情况)。
请注意Future
不保证并行性(它们是否以及如何并行执行取决于它们运行的ExecutionContext
),只是异步。此外,如果您在 Spark 转换中(即在执行器上,而不是驱动器上)生成计算执行的 future,则很可能不会有任何性能改进,因为 Spark 往往会在以下方面做得很好:让执行器上的核心保持忙碌,生成这些 future 所做的就是与 Spark 争夺核心。
总的来说,在组合并行抽象(例如 Spark RDD/DStreams/Dataframes、参与者和 future)时要非常小心:存在很多潜在的雷区,此类组合可能会违反各个组件中的保证和/或约定。
还值得注意的是,Spark 对中间值的可序列化性有要求,并且 future 通常不可序列化,因此 Spark 阶段不能产生 future;这意味着您基本上别无选择,只能 Await
关于在某个阶段产生的 future 。
如果您仍然想在 Spark 阶段生成 future(例如将它们发布到 Web 服务),最好使用 Future.sequence
将 future 折叠成一个,然后 Await
对此(请注意,我还没有测试过这个想法:我假设有一个隐式的 CanBuildFrom[Iterator[Future[String]], String, Future[String]]
可用):
def postString(s: String): Future[Unit] = ???
def postStringRDD(rdd: RDD[String]): RDD[String] = {
rdd.mapPartitions { strings =>
// since this is only get used for combining the futures in the Await, it's probably OK to use the implicit global execution context here
implicit val ectx = ???
Await.result(strings.map(postString))
}
rdd // Pass through the original RDD
}
关于scala - 何时使用 Scala Future?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58015387/