我将Akka参与者称为def activateReward,并执行OracleClient.rewardActivate(user)有时非常慢(该数据库不在我的职责范围内,并且属于另一家公司)。
当数据库运行缓慢时,线程池将耗尽,并且无法有效分配更多线程来运行future.onComplete回调,因为回调和Future在同一执行上下文中工作。
请告知如何从分配给 future 的线程中异步执行回调中的代码OracleClient.rewardActivate(user)
class RewardActivatorHelper {
private implicit val ec = new ExecutionContext {
val threadPool = Executors.newFixedThreadPool(1000)
def execute(runnable: Runnable) {threadPool.submit(runnable)}
def reportFailure(t: Throwable) {throw t}
}
case class FutureResult(spStart:Long, spFinish:Long)
def activateReward(msg:Msg, time:Long):Unit = {
msg.users.foreach {
user =>
val future:Future[FutureResult] = Future {
val (spStart, spFinish) = OracleClient.rewardActivate(user)
FutureResult(spStart, spFinish)
}
future.onComplete {
case Success(futureResult:FutureResult) =>
futureResult match {
case res:FutureResult => Logger.writeToLog(Logger.LogLevel.DEBUG,s"started:${res.spStart}finished:${res.spFinish}")
case _ => Logger.writeToLog(Logger.LogLevel.DEBUG, "some error")
}
case Failure(e:Throwable) => Logger.writeToLog(Logger.LogLevel.DEBUG, e.getMessage)
}
}
}
}
最佳答案
您可以通过执行以下几行操作来显式指定执行上下文,而不是隐式指定onComplete
回调:
import java.util.concurrent.Executors
import scala.concurrent.duration.Duration
object Example extends App {
import scala.concurrent._
private implicit val ec = new ExecutionContext {
val threadPool = Executors.newFixedThreadPool(1000)
def execute(runnable: Runnable) {threadPool.submit(runnable)}
def reportFailure(t: Throwable) {throw t}
}
val f = Future {
println("from future")
}
f.onComplete { _ =>
println("I'm done.")
}(scala.concurrent.ExecutionContext.Implicits.global)
Await.result(f, Duration.Inf)
}
当然,这不会解决数据库无法跟上的潜在问题,但是无论如何还是要知道的。
澄清一下:我让
onComplete
回调由标准global
执行上下文处理。您可能要创建一个单独的对象。
关于multithreading - Scala future及其回调在相同的执行上下文中工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31367438/