scala - 在接收方法中调用 Future 并在此之后停止 actor

标签 scala akka actor future

我正在创建一群 Actor ,他们做了一些工作,然后他们就停止了。在其中一些参与者中,我调用了返回 Future 的第 3 方 API。 .

MyActor extends Actor 
{
.... 

def receive = {

   case MsgA => {
    ... 
    //this evaluates to a Future
    val p : Future = someFutureAPICall()
    //stop the actor 
    context stop self 
    } 

}

}

现在自从Future是非阻塞的,actor 将在此之后立即停止(?),即使 Future还没有完成。在这种情况下,预期的行为是什么?

例如,如果我有一个 onCompleteFuture ,即使 Actor 已经停止,那会被执行吗?
MyActor extends Actor 
{
.... 

def receive = {

   case MsgA => {
    ... 
    //this evaluates to a Future
    val p : Future = someFutureAPICall()

    p.onComplete {
      //will these expressions ever be evaluated ? 
      case Success(x) => log.info("Success")
      case Failure(f) => log.info("Failure") 
    }
    //stop the actor 
    context stop self 
    } 

}

}

最佳答案

是的,返回 Future(第 3 方 API)的代码将立即执行并返回不完整的 Future。

将这个 Future 执行到完整性与开始它活着的 Actor 无关。

如果您不再需要那个 actor,您不需要等待 Future 完成,您可以像在第一个示例中那样停止 actor。

如果你需要在那个 Actor 身上做一些事情,你可以安装一个 onComplete回调那个 Future。一旦 Future 完成,它可以向 actor 发送消息以停止。例如像这样:

val myActor = self // Don't close over unstable actor reference by using self directly
p.onComplete {
  case Success(x) => myActor ! x; myActor ! akka.actor.PoisonPill // sends result to be processed and then stops actor
  case Failure(f) => myActor ! akka.actor.PoisonPill // stops actor
}

编辑

评论中建议的另一种选择是使用 pipeTo使用模式。它几乎做同样的事情。以下是它在 Akka 库中的实现方式:
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
  future onComplete {
    case Success(r) ⇒ recipient ! r
    case Failure(f) ⇒ recipient ! Status.Failure(f)
  }
  future
}

在创建 Future 后,您可以通过以下方式调用它:
p pipeTo myActor

主要区别在于您的actor在收到消息后必须自行关闭,并且故障通过Failure清楚地传达给actor。信息。这种方法使用起来更安全一些,因为你必须通过一个 ActorRef并且您不必记住将它(自我)复制到变量中。

关于scala - 在接收方法中调用 Future 并在此之后停止 actor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24878806/

相关文章:

.net - MailboxProcessor 性能问题

java - java中的任何参与者模型是否允许完成任务分组?

azure - Databricks parquet 读取时间太长

scala - Scala 和 Akka 中的 Future

java - Thrift 异常中的异常消息是如何工作的?

unit-testing - Akka 中的 TestKit、TestActorRef 和 TestProbe 是什么?

scala - 集群 Akka/Scala actor

java - 查找akka中所有 child Actor 的状态

scala - 如何使用 Spark Structured Streaming 将数据从 Kafka 主题流式传输到 Delta 表

scala - 在 Scala 中,如何从可序列化的类型创建 TypeTag?