scala - 在 Akka 中推迟消息的正确方法

标签 scala akka

我正在使用 akka 集群以在两个阶段执行分布式计算。先phaseA然后 phaseB .为了处理阶段,我使用 akka 的 FSM。

没有硬同步,因此其中一个节点可能会到达 phaseB而其他人仍在phaseA .

问题是,phaseB 中的一个发送 phaseB-related给其他人的消息(他们还在 phaseA 中) 是什么导致他们松动 phaseB-related消息。

现在我使用简单的技巧来推迟未知消息:

case any => self ! any

但是 IMO 这不是正确的方法。我知道我也可以安排any使用 akka 调度程序,但我也不喜欢这个。

这是简化的代码:
package whatever

import akka.actor._

object Test extends App {

  case object PhaseA
  case object PhaseB

  class Any extends Actor {

    def phaseA: Receive = {
      case PhaseA => {
        context.become(phaseB)
        println("in phaseB now")
      }
      case any => self ! any
    }

    def phaseB: Receive = {
      case PhaseB => println("got phaseB message !")
    }

    def receive = phaseA

  }

  val system = ActorSystem("MySystem")
  val any = system.actorOf(Props(new Any), name = "any")
  any ! PhaseB
  any ! PhaseA
}

在这种情况下推迟消息的正确方法是什么?

最佳答案

您可以存储消息以供以后处理。混音 akka.actor.Stash进入你的 Actor 和stash()您的 phaseB以后的消息。

当你的 FSM 在 phaseA并收到 phaseB留言,调用stash() .当那个 Actor 然后过渡到phaseB状态,请调用 unstashAll()并且所有隐藏的消息都将被重新传递。

关于scala - 在 Akka 中推迟消息的正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28005231/

相关文章:

scala - 对于在映射器之间共享信息的增强版 MapReduce,什么是好的应用程序?

scala - 如何在运行应用程序之前创建一个自定义 sbt 任务来设置 java 选项

Scala:在一次调用中将相同的函数应用于 2 个列表

scala - Akka 流卡夫卡: No configuration setting found for key 'kafka-clients'

java - akka Actor 没有按预期工作

scala - Akka Reactive Streams 总是落后一条消息

java - 如何并行创建100万个 Actor ?

scala - 在 Scala 中为 MultilayerPerceptronClassifier 准备数据

Scala case语法理解

scala - Play : error: not found: value playScalaSettings