Scala并行计算并在一个方法返回结果时中断

标签 scala

给出了几种方法

def methodA(args:Int):Int={
    //too long calculation
    result
}

def methodB(args:Int):Int={
    //too long calculation
    result
}

def methodC(args:Int):Int={
    //too long calculation
    result
}

它们具有相同的参数集并返回相同类型的结果。

需要并行计算方法,当一个方法返回结果时我需要中断其他方法。

最佳答案

这是一个有趣的问题。我不太有经验,所以对于具有一些可取消的 future 实现的初学者来说,可能有一种更像 Scala 的方法来做到这一点。但是,我用简洁换取了可读性。

import java.util.concurrent.{Callable, FutureTask}

import scala.concurrent.{Await, Future}

import scala.concurrent.duration._

import scala.concurrent.ExecutionContext.Implicits.global

def longRunningTask(foo: Int => String)(arg: Int): FutureTask[String] = {
  new FutureTask[String](new Callable[String]() {
    def call(): String = {
      foo(arg)
    }
  })
}
def killTasks(tasks: Seq[FutureTask[_]]): Unit = {
  tasks.foreach(_.cancel(false))
}
val task1 = longRunningTask(methodA)(3)
val task2 = longRunningTask(methodB)(4)

val task1Future = Future {
  task1.run()
  task1.get()
}
val task2Future = Future {
  task2.run()
  task2.get()
}
val firstFuture = Future.firstCompletedOf(List(task1Future, task2Future))
firstFuture.onSuccess({
  case result => {
    println(result)
    killTasks(List(task1, task2))
  }
})

那么,我在这里做了什么?我创建了一个辅助方法,它创建了一个 FutureTask(来自 Java API),它对一些 Int 参数执行一个 Int => String 操作。我还创建了一个辅助方法,它可以杀死集合中的所有 FutureTask

之后,我使用两种不同的方法和输入创建了两个长时间运行的任务(这是您的工作)。

下一部分有点难看,基本上我用 Future monad 运行 FutureTasks 并调用 get 来获取结果。

在那之后,我基本上只使用 Future companion 中的 firstCompletedOf 方法来处理第一个完成任务,并在处理完成后终止其余任务。

我可能会努力使这段代码更好一些,但从这里开始。

您还应该尝试使用不同的 'ExecutionContext's, and parallelism levels. 进行检查

关于Scala并行计算并在一个方法返回结果时中断,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38804161/

相关文章:

scala - 使用 Oozie 将 Spark 转为 Hbase

scala - 我可以使用 sbt 构建自己的独立控制台吗?

scala - Slick 2.1.0 中可选的嵌套映射实体

scala - 用于 Scala 对象和特征的 Clojure 互操作

scala - Scala 中 "static"方法的特征?

scala - 在 scala 中将 Int 添加到字符串

scala - 使用 scala.xml.pull 提取节点及其所有子节点的最佳方法?

scala - 为什么 Scala 中的柯里化(Currying)需要多个参数列表?

scala - Spark : Dataframe Serialization

scala - Scala 中的内联函数歧义