scala - 检查actor系统是否完成工作

标签 scala akka

我编写了一个基于 Akka 的自动机,它可以接受从 1 到 20 的罗马数字。该自动机工作正常,但在处理完整的输入后我想关闭系统。

因为我不知道自动机何时完成,所以我不知道何时发送停止信号。这会导致系统过早停止并且自动机无法完成其任务的问题。检查参与者系统中是否有尚未完成的任务的最佳方法是什么?

自动机:

import akka.actor.{Props, ActorSystem, ActorRef, Actor}

object RomanNumberAcceptor extends App {

  val allLegalNumbers = List("I", "II", "III", "IV", "V", "VI", "VII", "VIII", "IX", "X",
    "XI", "XII", "XIII", "XIV", "XV", "XVI", "XVII", "XVIII", "XIX", "XX"
  )

  val someWrongNumbers = List("IXX", "VV", "VX", "IIII", "IVV", "XXX", "XXI", "XXV", "IIX", "IIIV")

  for (number <- allLegalNumbers)
    State testNumber number

  for (number <- someWrongNumbers)
    State testNumber number

  // this stop signal is too early
  State.stop()
}

case class WordData(fullWord: String, remainder: String)

object State {
  val ErrorStateId: Int = -1

  private val data = Map(
    0 -> (Map('I' -> 1, 'V' -> 5, 'X' -> 10), false),
    1 -> (Map('I' -> 2, 'V' -> 4, 'X' -> 9), true),
    2 -> (Map('I' -> 3, 'V' -> ErrorStateId, 'X' -> ErrorStateId), true),
    3 -> (Map('I' -> ErrorStateId, 'V' -> ErrorStateId, 'X' -> ErrorStateId), true),
    4 -> (Map('I' -> ErrorStateId, 'V' -> ErrorStateId, 'X' -> ErrorStateId), true),
    5 -> (Map('I' -> 6, 'V' -> ErrorStateId, 'X' -> ErrorStateId), true),
    6 -> (Map('I' -> 2, 'V' -> ErrorStateId, 'X' -> ErrorStateId), true),
    9 -> (Map('I' -> ErrorStateId, 'V' -> ErrorStateId, 'X' -> ErrorStateId), true),
    10 -> (Map('I' -> 1, 'V' -> 5, 'X' -> 20), true),
    20 -> (Map('I' -> ErrorStateId, 'V' -> ErrorStateId, 'X' -> ErrorStateId), true),
    ErrorStateId -> (Map.empty[Char, Int], false)
  )

  val system = ActorSystem("RomanNumberAcceptor")

  val states: Map[Int, ActorRef] =
    for ((id, (map, accepted)) <- State.data)
      yield id -> system.actorOf(Props(State(id, map, accepted)), name = "s"+id)

  def testNumber(s: String) {
    states(0) ! WordData(s, s)
  }

  def stop() {
    for (a <- states.values)
      system stop a
    system.shutdown()
  }
}

case class State(id: Int, transMap: Map[Char, Int], accepted: Boolean) extends Actor {
  def receive = {
    case WordData(fullWord, _) if `id` == State.ErrorStateId =>
      println("Error: "+fullWord)
    case WordData(fullWord, remainder) if remainder.isEmpty =>
      println((if (accepted) "Success: " else "Error: ") + fullWord)
    case WordData(fullword, remainder) =>
      // maybe some heavy operation here
      (0 to 1e4.toInt).sum

      val nextAktor = transMap(remainder.head)
      State.states(nextAktor) ! WordData(fullword, remainder.tail)
  }
}

最佳答案

(以下假设 Scala 的非 Akka Actor。请参阅下面的编辑,了解如何调整它以与 Akka 一起使用)

我认为这个问题有两种可能的答案:

  1. 这没有意义。按照设计,每个状态参与者都没有完成,因为它仍然可以随时接受更多的输入单词,并且只是在等待它们。完成的就是发送要检查的单词的代码。

  2. 虽然您似乎没有为 WordData 消息返回任何有用的值,但您仍然可以等待返回值。您应该特别查看 !!!? 运算符的用法。通过将它们链接到不同的 Actor 上,您可以等待他们处理完这个词。但要小心 !?,因为可重入状态可能会导致死锁。

相反,我建议您为 WordData 消息定义一个 Boolean 答案,告诉您该单词是否被接受。然后,您可以使用 !! 检索 future (取决于另一个 future ,这取决于另一个 future ,依此类推)单词的长度,然后 testNumber 方法可以 block 检索接受值。如果您想并行测试多个单词,您还可以让 testNumber 返回 Future 本身,并阻止 App 中的所有这些 future调用 stop 之前的对象。

简而言之:您应该在 !! 运算符中查找 actor 以及 Future 类型。

编辑:抱歉造成困惑。正如 drexin 在评论中指出的那样,Akka 2 确实改变了这一点。请参阅Akka documentation of Futures了解如何发送消息以便检索 future。

至于仅检索None:当然,参与者必须返回某种有意义的值(如上面提出的Boolean)。请参阅documentation再次讨论如何发送来自参与者的回复(基本上,sender ! true)。当然,请注意,只有基本情况才能返回 truefalse。当你调用另一个 Actor 时,你将再次获得一个 future,为了避免死锁,你必须直接返回它。

因此,在调用方,您将必须连续等待这些 future,即等待最外面的 future 评估另一个 future。然后等待这个 future ,依此类推,直到检索到一个非 future 的 bool 结果值。

从 actor 内部调用下一个 actor 并返回相应 future(而不是 future 本身)的评估的问题意味着调用 actor 将被阻塞。因此,这会导致与传统 Scala actor 在使用阻塞 !? 调用时出现的死锁行为相同。

关于scala - 检查actor系统是否完成工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10800011/

相关文章:

java - 使用 Java 和 Maven 的 Akka 微内核有什么意义?

Akka 的 Java 聚合器

scala - 使用Circe将字段添加到json

scala - SBT无法解析org.apache.hadoop

java - 为什么 Scala 从对象的 main 方法而不是类的静态 main 方法开始?

scala - 如何将数据帧转换为 JSON 并使用 key 写入 kafka 主题

multithreading - Akka 调度器和路由器

multithreading - Akka Actor 优先级

scala - 与 Actor 并行化的最小工作单元是什么?

scala - Intellij Scala worksheet 运行类型差异解释