scala - postRestart 和 preRestart 方法在 akka actots 中没有被调用

标签 scala akka actor lifecycle akka-supervision

我正在关注this tutorial这是我的代码

case class ArtGroupDeleteFromES (uuidList:List[String]) 
class ArtGroupDeleteESActor extends Actor{
val log = LoggerFactory.getLogger(this.getClass)
    override def preStart() {
      log.debug("preStart  Starting ArtGroupDeleteESActor instance hashcode # {}",
       this.hashCode())
      }

    override def postStop() {
      log.debug("postStop Stopping ArtGroupDeleteESActor instance hashcode # {}",
       this.hashCode())
      }

    override def preRestart(reason: Throwable, message: Option[Any]) {
      log.debug("I am restarting")
      log.debug("ArtGroupDeleteESActor: preRestart")
      log.debug(s" MESSAGE: ${message.getOrElse("")}")
      log.debug(s" REASON: ${reason.getMessage}")
      super.preRestart(reason, message)
      }

    override def postRestart(reason: Throwable) {
      log.debug("restart completed!")
      log.debug("ArtGroupDeleteESActor: postRestart")
      log.debug(s" REASON: ${reason.getMessage}")
      super.postRestart(reason)
      }
def receive = {
  case ArtGroupDeleteFromES(uuidList) =>
 throw new Exception("Booom")
  sender ! true
   }
  case  message => 
    log.warn("Received unknown message: {}", message)
    unhandled(message)
 }

}

这是我向这位 Actor 发送消息的方式

class ArtGroupDeletionActor extends Actor{

  val log = LoggerFactory.getLogger(this.getClass)
 override val supervisorStrategy = OneForOneStrategy(
                                    maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
    case _:Exception => Restart
  }
 val artGroupDeleteESActor=context.actorOf(Props[ArtGroupDeleteESActor]
      .withDispatcher("akka.actor.ArtGroupDeleteESActor-dispatcher")
      ,name = "ArtGroupDeleteESActor")

   def receive = {

    case DeleteArtGroup(uuidList) =>
      val future1  = ask(artGroupDeleteESActor, ArtGroupDeleteFromES(uuidList)).mapTo[Boolean]
      var isDeletedfromES = Await.result(future1, timeout.duration)
    case message =>
      log.warn("Unhandled message received : {}", message)
      unhandled(message)
  }
}

object test extends App{
 val artGroupDeletionActor=system.actorOf(Props[ArtGroupDeletionActor]
      .withDispatcher("akka.actor.ArtGroupDeletionActor-dispatcher")
      ,name = "ArtGroupDeletionActor")
     artGroupDeletionActor ! DeleteArtGroup(List("123"))
}

PostRestart() 和 preRestart() 方法没有被调用,但是 preStart() 和 postStop() 被调用,请指导我哪里做错了

最佳答案

(为简单起见,从现在开始我将称您的 Actor 为 ParentChild)

这里发生的是,当Child.receive内部发生异常时,它不会向Parent发送响应,而是actor系统发送一些控制指令为监管策略。但是,Parent 被阻塞在 Await 上,等待 future1 完成,这仅在超过 timeout 后发生,并且然后,反过来,在 Parent.receive 内部抛出一个 TimeoutException,杀死(重新启动)Parent actor 本身,从而终止然后将 Child 中的异常传递给 deadLetters,而不会重新启动 Child

你不应该、永远、永远阻挡在 Actor 内部,所以这是不正确的:

  val future1 = ask(artGroupDeleteESActor, ArtGroupDeleteFromES(uuidList)).mapTo[Boolean]
  var isDeletedfromES = Await.result(future1, timeout.duration)

相反,您必须使用某种消息标识来区分并发环境中的一个回复与另一个回复,或者向 Future 添加一个 onComplete 并向 self 发送一条消息 在闭包中(注意:除了向 Future 发送消息之外,不应在闭包内执行任何逻辑!)。

所以,选项A:

case class ArtGroupDeleteFromES(id: Long, uuidList: List[String])
case class ArtGroupDeleteFromESResult(id: Long, success: Boolean)

class Parent extends Actor {
  override val supervisionStrategy = ...
  var msgId = 0L
  var pendingRequesters = Map.empty[Long, ActorRef]
  val child = context.actorOf(Props[Child])

  def nextId = {
    msgId += 1
    msgId
  }

  def receive = {
    case DeleteArtGroup(uuidList) =>
      val id = nextId
      pendingRequesters += id -> sender() // store a reference to the sender so that you can send it a message when everything completes
      child ! DeleteArtGroupFromES(nextId, uuidList)
    case ArtGroupDeleteFromESResult(id, success) =>
      // process result...
      pendingRequesters(id) ! "done"
      pendingRequesters -= id
  }
}

选项B:

case class ArtGroupDeleteFromES(uuidList: List[String])
case class ArtGroupDeleteFromESResult(replyTo: ActorRef, success: Boolean)

class Parent extends Actor {
  override val supervisionStrategy = ...
  val child = context.actorOf(Props[Child])

  def receive = {
    case DeleteArtGroup(uuidList) =>
      val requester = sender() // when the future completes, sender may have already changed, so you need to remember it
      (child ? DeleteArtGroupFromES(uuidList)).onComplete {
        case Success(success) => self ! ArtGroupDeleteFromESResult(requester, success)
        case Failure(e) =>
          log.warn("Could not delete...", e) 
          self ! ArtGroupDeleteFromESResult(requester, success = false)
  }
}

关于scala - postRestart 和 preRestart 方法在 akka actots 中没有被调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37608915/

相关文章:

scala - 如何设置 Scala 2.10 并行集合的默认线程数?

java - Riak:通过 Java/Scala 在键上创建索引

scala - 在规范 2 中创建夹具

java - Vert.x 中队列和 verticle 数量的限制

使用 Akka 进行 Scala Testkit 单元测试

class - 将枚举参数传递给案例类不起作用

multithreading - 让 Actor sleep

java - 玩2.5升级报错: CompletionException - There is no HTTP Context available from here

akka - 将 Java 线程转换为 AKKA Actor

java - 使用 Props 初始化 actor