akka - 如何取消 Akka Actor ?

标签 akka actor

我有一个 akka actor(worker)接收请求并回复它。请求处理可能需要 3-60 分钟。来电者(也是 Actor )目前正在使用 !!!并等待 future.get,但是如果需要,可以更改 Caller actor 的设计。另外,我目前正在使用 EventDriven 调度程序。

我如何取消(用户发起)请求处理,以便释放 worker actor 并返回到就绪状态以接收新请求?我希望有一个类似于 java.util.concurrent.Future 的取消方法的方法,但在 Akka 1.1.3 中找不到

编辑:

我们试图通过 completeWithException 获得我们正在寻找的行为:

object Cancel {
  def main(args: Array[String]) {
    val actor = Actor.actorOf[CancelActor].start
    EventHandler.info(this, "Getting future")
    val future = (actor ? "request").onComplete(x => EventHandler.info(this, "Completed!! " + x.get))
    Thread.sleep(500L)
    EventHandler.info(this, "Cancelling")
    future.completeWithException(new Exception("cancel"))
    EventHandler.info(this, "Future is " + future.get)
  }
}

class CancelActor extends Actor {
  def receive = {
    case "request" =>
      EventHandler.info(this, "start")
      (1 to 5).foreach(x => {
        EventHandler.info(this, "I am a long running process")
        Thread.sleep(200L)
      })
      self reply "response"
      EventHandler.info(this, "stop")
  }
}

但这并没有停止长时间运行的进程。

    [INFO]    [9/16/11 1:46 PM] [main] [Cancel$] Getting future
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] start
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [main] [Cancel$] Cancelling
    [ERROR]   [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-7] [ActorCompletableFuture] 
    java.lang.Exception: cancel
        at kozo.experimental.Cancel$.main(Cancel.scala:15)
...

    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] stop

相比之下,考虑 java.util.concurrent.Future 的行为:

object Cancel2 {
  def main(args: Array[String]) {
    val executor: ExecutorService = Executors.newSingleThreadExecutor()
    EventHandler.info(this, "Getting future")
    val future = executor.submit(new Runnable {
      def run() {
        EventHandler.info(this, "start")
        (1 to 5).foreach(x => {
          EventHandler.info(this, "I am a long running process")
          Thread.sleep(200L)
        })
      }
    })
    Thread.sleep(500L)
    EventHandler.info(this, "Cancelling")
    future.cancel(true)
    EventHandler.info(this, "Future is " + future.get)
  }
}

哪个会停止长时间运行的进程

    [INFO]    [9/16/11 1:48 PM] [main] [Cancel2$] Getting future
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] start
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    Exception in thread "main" java.util.concurrent.CancellationException
...
    [INFO]    [9/16/11 1:48 PM] [main] [Cancel2$] Cancelling

最佳答案

您还可以检查 Actor 中 Future 的状态。

class MyActor extends Actor {
  def receive = {
    case msg =>
      while(!self.senderFuture.get.isCompleted) {
        performWork(msg)
      }
      self reply result
  }
  ...
}

这需要使用“?”发送消息或者“问”。 希望能帮助到你。

关于akka - 如何取消 Akka Actor ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7420753/

相关文章:

multithreading - 如何设置 Scala actor 独占使用单独的线程运行?

scala - 无状态 Akka Actor

scala - 如何在使用 actors 和 sbt 更改代码后重新加载 scala 应用程序

scala - Akka Supervisor 策略 - 正确的用例

scala - 把jdbc操作放在actor里面好不好?

android - 有没有一种方法可以检查 libgdx Scene2d 图像或 Actor 是否被触摸?

scala - Akka Actor 测试 : Automatic reply with a TestProbe

java - 问题设置为学习编程 future 、 promise 和参与者的资源

scala - 将 DAO 层容纳在 Scala Akka Actor 中

java - NonFatal 捕获 Throwable 可以吗?