scala - 如果抛出异常,Akka Actor 不会终止

标签 scala routes actor akka fault-tolerance

我目前正在尝试开始使用 Akka,但遇到了一个奇怪的问题。我的 Actor 的代码如下:

class AkkaWorkerFT extends Actor {
  def receive = {
    case Work(n, c) if n < 0 => throw new Exception("Negative number")
    case Work(n, c) => self reply n.isProbablePrime(c);
  }
}

这就是我开始工作的方式:

val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()

这就是我关闭一切的方法:

futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill

现在发生的情况是,如果我向工作人员发送 n > 0 的消息(不会引发异常),则一切正常并且应用程序正常关闭。但是,一旦我向它发送一条导致异常的消息,应用程序就不会终止,因为仍有一个参与者在运行,但我无法弄清楚它来自哪里。

如果有帮助,这是有问题的线程的堆栈:

  Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended) 
    Unsafe.park(boolean, long) line: not available [native method]  
    LockSupport.park(Object) line: 158  
    AbstractQueuedSynchronizer$ConditionObject.await() line: 1987   
    LinkedBlockingQueue<E>.take() line: 399 
    ThreadPoolExecutor.getTask() line: 947  
    ThreadPoolExecutor$Worker.run() line: 907   
    MonitorableThread(Thread).run() line: 680   
    MonitorableThread.run() line: 182   

PS:没有终止的线程不是任何工作线程,因为我添加了 postStop 回调,每个线程都正常停止。

PPS:Actors.registry.shutdownAll 解决了这个问题,但我认为 shutdownAll 只能作为最后的手段,不是吗?

最佳答案

处理akka actor内部问题的正确方法不是抛出异常,而是设置supervisor层次结构

"Throwing an exception in concurrent code (let’s assume we are using non-linked actors), will just simply blow up the thread that currently executes the actor.

There is no way to find out that things went wrong (apart from inspecting the stack trace). There is nothing you can do about it."

参见Fault Tolerance Through Supervisor Hierarchies (1.2)

* 注意 * 以上内容适用于旧版本的 Akka (1.2) 在较新的版本(例如 2.2)中,您仍然需要设置主管层次结构,但它会捕获子进程抛出的异常。例如

class Child extends Actor {
    var state = 0
    def receive = {
      case ex: Exception ⇒ throw ex
      case x: Int        ⇒ state = x
      case "get"         ⇒ sender ! state
    }
  }

在主管中:

class Supervisor extends Actor {
    import akka.actor.OneForOneStrategy
    import akka.actor.SupervisorStrategy._
    import scala.concurrent.duration._

    override val supervisorStrategy =
      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
        case _: ArithmeticException      ⇒ Resume
        case _: NullPointerException     ⇒ Restart
        case _: IllegalArgumentException ⇒ Stop
        case _: Exception                ⇒ Escalate
      }

    def receive = {
      case p: Props ⇒ sender ! context.actorOf(p)
    }
  }

参见Fault Tolerance Through Supervisor Hierarchies (2.2)

关于scala - 如果抛出异常,Akka Actor 不会终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6170227/

相关文章:

sql - Scala 如何在使用 sqlContext 的查询中处理 isnull 或 ifnull

scala - 如何建模类型安全的枚举类型?

scala - Scala 中丢失类型信息和类型类实现

asp.net - 如何在 Web 表单和 GridView 的 HyperLinkFields 中使用路由

java - Java 中的“ip 路由获取”

java - Libgdx 中的 stage 是如何工作的?

scala - IntelliJ IDEA、社区版还是旗舰版?

angularjs - Angular 路由的 htaccess 重定向

java - Akka Java FSM 示例

scala - akka Actor 向邮箱负责人发送消息