scala - Actors 中 future 的执行上下文

标签 scala akka actor future

我有一个 Actor,并且在某些消息上我正在运行一些返回 Future 的方法。

 def receive: Receive = {

    case SimpleMessge() =>
        val futData:Future[Int] = ...
        futData.map { data =>
           ... 
        }
}

是否可以传递实际上下文来等待此数据?或者,如果我需要 SimpleMessage 中的这些数据,Await 是我能做的最好的事情?

最佳答案

如果您确实需要等待 future 完成后再处理下一条消息,您可以尝试如下操作:

object SimpleMessageHandler{
  case class SimpleMessage()
  case class FinishSimpleMessage(i:Int)
}

class SimpleMessageHandler extends Actor with Stash{
  import SimpleMessageHandler._
  import context._
  import akka.pattern.pipe

  def receive = waitingForMessage
  def waitingForMessage: Receive = {

    case SimpleMessage() =>
      val futData:Future[Int] = ...
      futData.map(FinishSimpleMessage(_)) pipeTo self
      context.become(waitingToFinish(sender))
  }

  def waitingToFinish(originalSender:ActorRef):Receive = {
    case SimpleMessage() => stash()

    case FinishSimpleMessage(i) =>
      //Do whatever you need to do to finish here
      ...
      unstashAll()
      context.become(waitingForMessage)

    case Status.Failure(ex) =>
      //log error here
      unstashAll()
      context.become(waitingForMessage)      
  }
}

在这种方法中,我们处理一个 SimpleMessage,然后切换处理逻辑来存储所有后续收到的 SimpleMessage,直到我们得到 future 的结果。当我们得到结果时,无论是否失败,我们都会在等待 future 的同时取消所有收到的其他 SimpleMessage 并继续我们的快乐之路。

这个 actor 只是在两种状态之间来回切换,这样您一次只能完全处理一个 SimpleMessage,而无需阻止 Future。

关于scala - Actors 中 future 的执行上下文,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24209489/

相关文章:

scala - 需要帮助实现阻止有界邮箱

scala - 为什么 Scala 元组的索引是从 1 开始的?

scala - 为什么 sbt 不能解析 akka-remote?

scala - 如果尝试写入不再存在的客户端,远程角色框架会做什么?

java - 我可以为某些 Actor 的自动名称添加前缀吗?

scala - 如何在SBT中发布到多个存储库?

scala - 如何在 Scala 中声明新的部分函数类型

scala - 组合 future - 如何获得与 future 列表结果相关联的另一个变量

java - 如何从表中读取所有记录(> 1000万条记录)并将每条记录作为 block 响应?

scala - 在 Akka 中,如何将响应从下游参与者路由到正确的上游?