我正在使用 BackoffSupervisor 策略来创建一个必须处理某些消息的子 Actor。我想实现一个非常简单的重启策略,在出现异常的情况下:
- 子级将失败消息传播给主管
Supervisor 重新启动子级并再次发送失败消息。
主管重试 3 次后放弃
- Akka 持久化不是一种选择
到目前为止我所拥有的是:
主管定义:
val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
childProps,
childName = cmd.hashCode.toString,
minBackoff = 1.seconds,
maxBackoff = 2.seconds,
randomFactor = 0.2
)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException => {
println("caught specific message!")
SupervisorStrategy.Restart
}
case _: Exception => SupervisorStrategy.Restart
case _ ⇒ SupervisorStrategy.Escalate
})
)
val sup = context.actorOf(supervisor)
sup ! cmd
应该发送电子邮件的子 Actor ,但失败了(抛出一些异常)并将异常传播回主管:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
throw new Exception("surprising exception")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
在上面的代码中,我将任何异常包装到自定义类 MessageException 中,该异常会传播到 SupervisorStrategy,但是如何将其进一步传播到新子级以强制重新处理?这是正确的方法吗?
编辑。我尝试在 preRestart
钩子(Hook)上向 Actor 重新发送消息,但不知何故钩子(Hook)没有被触发:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
// println("mail sent!")
throw new Exception("surprising exception")
}
override def preStart(): Unit = {
println("child starting")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason match {
case m: MessageException => {
println("aaaaa")
message.foreach(self ! _)
}
case _ => println("bbbb")
}
}
override def postStop(): Unit = {
println("child stopping")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
这给了我类似于以下输出的内容:
new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting
但是没有来自 preRestart
Hook 的日志
最佳答案
子进程的 preRestart
钩子(Hook)未被调用的原因是 Backoff.onFailure
使用 BackoffOnRestartSupervisor
在幕后,它将默认的重新启动行为替换为与退避策略一致的停止和延迟启动行为。换句话说,当使用 Backoff.onFailure 时,当子进程重新启动时,子进程的 preRestart 方法不会被调用,因为底层主管实际上停止了子进程,然后再次启动它之后。 (使用 Backoff.onStop
可以触发子进程的 preRestart
钩子(Hook),但这与当前的讨论无关。)
BackoffSupervisor
API 不支持在主管的子进程重新启动时自动重新发送消息:您必须自己实现此行为。重试消息的一个想法是让 BackoffSupervisor 的主管来处理它。例如:
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withReplyWhileStopped(ChildIsStopped)
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd) // replace cmd with whatever the property name is
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
def receive = {
case cmd: NewMail =>
sup ! cmd
case Error(cmd) =>
timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
// We assume that NewMail has an id field. Also, adjust the time as needed.
case Replay(cmd) =>
sup ! cmd
case ChildIsStopped =>
println("child is stopped")
}
在上面的代码中,嵌入 MessageException
中的 NewMail
消息被包装在自定义案例类中(以便轻松将其与“正常”/新消息区分开来) NewMail
消息)并发送给 self
。在此上下文中,self
是创建 BackoffSupervisor
的参与者。然后,这个封闭的 actor 使用 single timer在某个时刻重播原始消息。这个时间点应该在未来足够远的地方,这样 BackoffSupervisor
可能会耗尽 SenderActor
的重新启动尝试,以便子进程有足够的机会进入在收到重新发送的消息之前,状态为“良好”。显然,无论子进程重启了多少次,这个例子都只涉及一条消息的重发。
另一个想法是为每条 NewMail
消息创建一个 BackoffSupervisor
-SenderActor
对,并拥有 SenderActor
在 preStart
Hook 中将 NewMail
消息发送给自身。这种方法的一个问题是资源的清理。即,当处理成功或子进程重新启动耗尽时,关闭 BackoffSupervisors
(这将依次关闭其各自的 SenderActor
子进程)。 NewMail
id 到 (ActorRef, Int)
元组的映射(其中 ActorRef
是对 BackoffSupervisor
的引用> actor,Int
是重新启动尝试的次数)在这种情况下会很有帮助:
class Overlord extends Actor {
var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long
def receive = {
case cmd: NewMail =>
val childProps = Props(new SenderActor(cmd, self))
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd)
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
state += (cmd.id -> (sup, 0))
case ProcessingDone(cmdId) =>
state.get(cmdId) match {
case Some((backoffSup, _)) =>
context.stop(backoffSup)
state -= cmdId
case None =>
println(s"${cmdId} not found")
}
case Error(cmd) =>
val cmdId = cmd.id
state.get(cmdId) match {
case Some((backoffSup, numRetries)) =>
if (numRetries == 3) {
println(s"${cmdId} has already been retried 3 times. Giving up.")
context.stop(backoffSup)
state -= cmdId
} else
state += (cmdId -> (backoffSup, numRetries + 1))
case None =>
println(s"${cmdId} not found")
}
case ...
}
}
请注意,上面示例中的 SenderActor
采用 NewMail
和 ActorRef
作为构造函数参数。后一个参数允许 SenderActor
向封闭的 Actor 发送自定义 ProcessingDone
消息:
class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
override def preStart(): Unit = {
println(s"child starting, sending ${cmd} to self")
self ! cmd
}
def fakeSendMail(): Unit = ...
def receive = {
case cmd: NewMail => ...
}
}
显然,SenderActor
被设置为每次使用 fakeSendMail
的当前实现都会失败。我将保留 SenderActor
中所需的其他更改来实现顺利路径,其中 SenderActor
向 target< 发送一条
,给你。ProcessingDone
消息
关于scala - 从 Supervisor 重新启动后向 actor 发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48446194/