我目前正在尝试开始使用 Akka,但遇到了一个奇怪的问题。我的 Actor 的代码如下:
class AkkaWorkerFT extends Actor {
def receive = {
case Work(n, c) if n < 0 => throw new Exception("Negative number")
case Work(n, c) => self reply n.isProbablePrime(c);
}
}
这就是我开始工作的方式:
val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()
这就是我关闭一切的方法:
futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill
现在发生的情况是,如果我向工作人员发送 n > 0 的消息(不会引发异常),则一切正常并且应用程序正常关闭。但是,一旦我向它发送一条导致异常的消息,应用程序就不会终止,因为仍有一个参与者在运行,但我无法弄清楚它来自哪里。
如果有帮助,这是有问题的线程的堆栈:
Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: 158
AbstractQueuedSynchronizer$ConditionObject.await() line: 1987
LinkedBlockingQueue<E>.take() line: 399
ThreadPoolExecutor.getTask() line: 947
ThreadPoolExecutor$Worker.run() line: 907
MonitorableThread(Thread).run() line: 680
MonitorableThread.run() line: 182
PS:没有终止的线程不是任何工作线程,因为我添加了 postStop 回调,每个线程都正常停止。
PPS:Actors.registry.shutdownAll
解决了这个问题,但我认为 shutdownAll 只能作为最后的手段,不是吗?
最佳答案
处理akka actor内部问题的正确方法不是抛出异常,而是设置supervisor层次结构
"Throwing an exception in concurrent code (let’s assume we are using non-linked actors), will just simply blow up the thread that currently executes the actor.
There is no way to find out that things went wrong (apart from inspecting the stack trace). There is nothing you can do about it."
参见Fault Tolerance Through Supervisor Hierarchies (1.2)
* 注意 * 以上内容适用于旧版本的 Akka (1.2) 在较新的版本(例如 2.2)中,您仍然需要设置主管层次结构,但它会捕获子进程抛出的异常。例如
class Child extends Actor {
var state = 0
def receive = {
case ex: Exception ⇒ throw ex
case x: Int ⇒ state = x
case "get" ⇒ sender ! state
}
}
在主管中:
class Supervisor extends Actor {
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException ⇒ Resume
case _: NullPointerException ⇒ Restart
case _: IllegalArgumentException ⇒ Stop
case _: Exception ⇒ Escalate
}
def receive = {
case p: Props ⇒ sender ! context.actorOf(p)
}
}
关于scala - 如果抛出异常,Akka Actor 不会终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6170227/