multithreading - Scala future及其回调在相同的执行上下文中工作

标签 multithreading scala concurrency callback future

我将Akka参与者称为def activateReward,并执行OracleClient.rewardActivate(user)有时非常慢(该数据库不在我的职责范围内,并且属于另一家公司)。

当数据库运行缓慢时,线程池将耗尽,并且无法有效分配更多线程来运行future.onComplete回调,因为回调和Future在同一执行上下文中工作。

请告知如何从分配给 future 的线程中异步执行回调中的代码OracleClient.rewardActivate(user)

class RewardActivatorHelper {

  private implicit val ec = new ExecutionContext {
    val threadPool = Executors.newFixedThreadPool(1000)
    def execute(runnable: Runnable) {threadPool.submit(runnable)}
    def reportFailure(t: Throwable) {throw t}
  }

  case class FutureResult(spStart:Long, spFinish:Long)

  def activateReward(msg:Msg, time:Long):Unit = {
    msg.users.foreach {
      user =>
        val future:Future[FutureResult] = Future {
          val (spStart, spFinish) = OracleClient.rewardActivate(user)
          FutureResult(spStart, spFinish)
        }

        future.onComplete {
          case Success(futureResult:FutureResult) =>
            futureResult match {
              case res:FutureResult => Logger.writeToLog(Logger.LogLevel.DEBUG,s"started:${res.spStart}finished:${res.spFinish}")
              case _ => Logger.writeToLog(Logger.LogLevel.DEBUG, "some error")
            }

          case Failure(e:Throwable) => Logger.writeToLog(Logger.LogLevel.DEBUG, e.getMessage)    
        }
    }
  }
}

最佳答案

您可以通过执行以下几行操作来显式指定执行上下文,而不是隐式指定onComplete回调:

import java.util.concurrent.Executors
import scala.concurrent.duration.Duration

object Example extends App {
  import scala.concurrent._

  private implicit val ec = new ExecutionContext {
    val threadPool = Executors.newFixedThreadPool(1000)
    def execute(runnable: Runnable) {threadPool.submit(runnable)}
    def reportFailure(t: Throwable) {throw t}
  }

  val f = Future {
    println("from future")
  }

  f.onComplete { _ =>
    println("I'm done.")
  }(scala.concurrent.ExecutionContext.Implicits.global)

  Await.result(f, Duration.Inf)
}

当然,这不会解决数据库无法跟上的潜在问题,但是无论如何还是要知道的。

澄清一下:我让onComplete回调由标准global执行上下文处理。您可能要创建一个单独的对象。

关于multithreading - Scala future及其回调在相同的执行上下文中工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31367438/

相关文章:

java - 等待API服务器的响应

java - 根据Map内容进行同步

c# - 使用线程安全的 .NET 4+ 集合时是否可能导致死锁?

python - 来自远程管理器的 AutoProxy 对象中的字典不可订阅

scala - 错误 : could not find implicit value for parameter timeout: akka. util.Timeout

java - ReentrantReadWriteLock返回的锁是否等同于它的读写锁?

c++ - 如何为同一类对象的成员函数保留单独的变量拷贝?

scala - flinks DataStream 的模拟对象

scala - 在Scala中从初始对象和生成下一个对象的函数创建O(1)内存可迭代

java - 有没有一种方法可以让多个线程添加到同一个列表对象?