scala - 何时使用 Scala Future?

标签 scala apache-spark apache-spark-sql future futuretask

我是一名 Spark Scala 程序员。我有一个 Spark 工作,其中包含完成整个工作的子任务。我想使用 Future 并行完成子任务。完成整个工作后,我必须返回整个工作响应。

我听说scala Future是一旦主线程执行并停止,剩余的线程将被杀死,并且您将得到空响应。

我必须使用Await.result来收集结果。但所有博客都告诉您应该避免 Await.result,这是一种不好的做法。

在我的情况下,使用 Await.result 是否正确?

def computeParallel(): Future[String] = {
  val f1 = Future {  "ss" }
  val f2 = Future { "sss" }
  val f3 = Future { "ssss" }

  for {
    r1 <- f1
    r2 <- f2
    r3 <- f3
  } yield (r1 + r2 + r3)
} 

computeParallel().map(result => ???)



据我了解,我们必须在 Web 服务类型的应用程序中使用 Future,其中有一个始终运行且不会退出的进程。但就我而言,一旦逻辑执行(scala 程序)完成,它将退出。

我可以使用 Future 来解决我的问题吗?

最佳答案

除特殊情况外,在 Spark 中使用 future 可能是不可取的,并且简单的并行计算也不是其中之一(为阻塞 I/O 提供非阻塞包装器(例如向外部服务发出请求)很可能是这样的)唯一的特殊情况)。

请注意Future不保证并行性(它们是否以及如何并行执行取决于它们运行的​​ExecutionContext),只是异步。此外,如果您在 Spark 转换中(即在执行器上,而不是驱动器上)生成计算执行的 future,则很可能不会有任何性能改进,因为 Spark 往往会在以下方面做得很好:让执行器上的核心保持忙碌,生成这些 future 所做的就是与 Spark 争夺核心。

总的来说,在组合并行抽象(例如 Spark RDD/DStreams/Dataframes、参与者和 future)时要非常小心:存在很多潜在的雷区,此类组合可能会违反各个组件中的保证和/或约定。

还值得注意的是,Spark 对中间值的可序列化性有要求,并且 future 通常不可序列化,因此 Spark 阶段不能产生 future;这意味着您基本上别无选择,只能 Await关于在某个阶段产生的 future 。

如果您仍然想在 Spark 阶段生成 future(例如将它们发布到 Web 服务),最好使用 Future.sequence将 future 折叠成一个,然后 Await对此(请注意,我还没有测试过这个想法:我假设有一个隐式的 CanBuildFrom[Iterator[Future[String]], String, Future[String]] 可用):

def postString(s: String): Future[Unit] = ???

def postStringRDD(rdd: RDD[String]): RDD[String] = {
  rdd.mapPartitions { strings =>
    // since this is only get used for combining the futures in the Await, it's probably OK to use the implicit global execution context here
    implicit val ectx = ???
    Await.result(strings.map(postString))
  }
  rdd  // Pass through the original RDD
}

关于scala - 何时使用 Scala Future?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58015387/

相关文章:

scala - flink reduceGroup 中的迭代器行为

scala - scala 中的日期差异

java - 包私有(private)方法覆盖时发生AbstractMethodError

python - 将符合上次修改窗口的 S3 文件读入 DataFrame

java - 在 Spark java 中使用 Dataframe 合并两个 parquet 文件

斯卡拉这是什么意思? "{/* compiled code */}"

amazon-s3 - Spark-1.4.1 saveAsTextFile 到 S3 在 emr-4.0.0 上非常慢

apache-spark - 无法将 Spark 应用程序提交到集群,卡在 "UNDEFINED"

scala - 如何将 csv 直接加载到 Spark 数据集中?

pandas - 使 groupby.apply 更高效或转换为 spark