scala - Actor 重启后如何从 Supervisor 向 Actor 发送消息?

标签 scala akka actor akka-supervision

要求?
- 必须有一个长期运行的进程(守护进程),并且应该永远运行
- 如果出现任何异常,应重新启动,但如果再次失败两次,则不应采取重新启动措施

我遇到问题?
- Actor 已重新启动,但没有再次发送消息

我有什么?

主类

package com.learner.ahka.runforever

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

object RaceEvent extends App {
  val config = ConfigFactory.parseString( """
    akka.loglevel = "DEBUG"
    akka.actor.debug {
      receive = on
      lifecycle = on
    }
                                          """)
  val system = ActorSystem.create("race", config)
  val coach = system.actorOf(Coach.props(), "coach")
  coach ! GetSetGo
}

主管

package com.learner.ahka.runforever

import akka.actor.SupervisorStrategy.{Escalate, Restart}
import akka.actor._
import akka.event.LoggingReceive

import scala.concurrent.duration._

case object GetSetGo

object Coach {
  def props(): Props = Props[Coach];
}

class Coach() extends Actor with ActorLogging {

  val runner = context.actorOf(Runner.props(new Marathon), "runner")

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 5 seconds) {
    case _: RuntimeException => Restart
  }

  override def receive = LoggingReceive {
    case GetSetGo => runner ! GoForIt
  }
}

Actor

package com.learner.ahka.runforever

import akka.actor.Status.Failure
import akka.actor.{Actor, ActorLogging, Props}
import akka.event.LoggingReceive
import akka.pattern.pipe

object Runner {
  def props(race: Race) = Props(classOf[Runner], race)
}

class Runner(race: Race) extends Actor with ActorLogging {

  import context.dispatcher

  override def receive: Receive = LoggingReceive {
    case GoForIt => race.start pipeTo self
    case Failure(throwable) => throw throwable
  }
}

实际工作

package com.learner.ahka.runforever

import scala.concurrent.Future

case object GoForIt

trait Race {
  def start: Future[Any]
}

class Marathon extends Race {

  import scala.concurrent.ExecutionContext.Implicits.global

  override def start: Future[Any] = future

  val future = Future {
    for (i <- 1 to 3) {
      println("I am a Marathon Runner!")
      Thread.sleep(1000)
    }
    throw new RuntimeException("MarathonRunner is tired")
  }
}

日志

[DEBUG] [05/30/2015 16:03:35.696] [main] [EventStream(akka://race)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 16:03:35.698] [main] [EventStream(akka://race)] Default Loggers started
[DEBUG] [05/30/2015 16:03:35.704] [race-akka.actor.default-dispatcher-4] [akka://race/system] now supervising Actor[akka://race/system/deadLetterListener#-1391310385]
[DEBUG] [05/30/2015 16:03:35.706] [race-akka.actor.default-dispatcher-3] [akka://race/system/deadLetterListener] started (akka.event.DeadLetterListener@191ba186)
[DEBUG] [05/30/2015 16:03:35.710] [race-akka.actor.default-dispatcher-2] [akka://race/user] now supervising Actor[akka://race/user/coach#-1161587711]
I am a Marathon Runner!
[DEBUG] [05/30/2015 16:03:35.722] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] started (com.learner.ahka.runforever.Coach@66f0f319)
[DEBUG] [05/30/2015 16:03:35.722] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] started (com.learner.ahka.runforever.Runner@72f67980)
[DEBUG] [05/30/2015 16:03:35.723] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] now supervising Actor[akka://race/user/coach/runner#755574648]
[DEBUG] [05/30/2015 16:03:35.723] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] received handled message GetSetGo
[DEBUG] [05/30/2015 16:03:35.725] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message GoForIt
I am a Marathon Runner!
I am a Marathon Runner!
[DEBUG] [05/30/2015 16:03:38.739] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] received handled message Failure(java.lang.RuntimeException: MarathonRunner is tired)
[ERROR] [05/30/2015 16:03:38.752] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
    at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
    at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[DEBUG] [05/30/2015 16:03:38.753] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarting
[DEBUG] [05/30/2015 16:03:38.755] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarted

更新
如果我不委托(delegate)给 Future,即使重新启动,一切也会按预期工作,但当委托(delegate)给 Future 时,Future 不会在 actor 重启 的情况下执行。请参阅here

最佳答案

您可以重写 postRestart 方法以将消息发送回父级以通知其重新启动,然后在父级中监视新消息类型并做出相应响应。如果 context.parent 无法顺利实现此目的(我倾向于不依赖它),则让 Coach actor 将其 self actor 引用作为新的构造函数传递实例化Runner时的参数。

关于scala - Actor 重启后如何从 Supervisor 向 Actor 发送消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30552269/

相关文章:

scala - 使用 dataframe : Collect return empty array 进行 Spark 单元测试

scala - 理解 Scala 代码 : (-_. _2)

unit-testing - 如何在不使用 Thread.sleep 的情况下对向自身发送消息的 Akka Actor 进行单元测试

scala - Actor 可以在特定条件下读取消息吗?

scala - Actor : How to efficiently handle read-moSTLy data

scala - 注册表时SparkSQL MissingRequirementError

scala - 比较Spark中两个数据框中的列

scala - Akka Actor-等待一段时间,等待消息,否则发送消息

java - 使用 Actor Supervised ,如果发生失败,如何以定义的时间间隔重试相同的消息定义的次数

java - Akka 消息中的发送行为