scala - akka 的 Actor 的接收方法与 Future( block )交互 - 在 Future 完成之前新消息可以进入吗?

标签 scala asynchronous akka future

我编写了以下代码,以便在 Future 中调用某些异步工作时尝试确定参与者对于传入消息的行为(实际上,异步工作旨在模拟 Slick 的异步数据库 API):

  val actor = system.actorOf(Props[WTFAsyncActor])

  for (i <- 1 until 100) {
    actor ! 'wtf + i.toString
    Thread.sleep(200l) // yes I know actors should not invoke this method
  }
  Thread.sleep(10000l) // same as above

Actor的代码如下:

class WTFAsyncActor extends Actor with ActorLogging{
  import scala.concurrent.ExecutionContext.Implicits.global
  override def receive: Receive = {

    case i@_ =>

      log.info(s"[$i] external block pre future")
      Thread.`yield`() // just trying to relinquish control to induce context switching

      Future {
        log.info(s"[$i] internal block pre sleep")
        Thread.sleep(1000l) // yeah bad - trying to emulate something meaningful like slick
        log.info(s"[$i] internal block post sleep")
      }

      log.info(s"[$i] external block post future")
  }
}  

我得到以下日志(示例运行的摘录)

10:17:58.408 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf1] external block pre future
10:17:58.420 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf1] external block post future
10:17:58.421 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf1] internal block pre sleep
10:17:58.599 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf2] external block pre future
10:17:58.600 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf2] external block post future
10:17:58.600 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf2] internal block pre sleep
10:17:58.800 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf3] external block pre future
10:17:58.801 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf3] external block post future
10:17:58.801 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf3] internal block pre sleep
10:17:59.001 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf4] external block pre future
10:17:59.002 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf4] external block post future
10:17:59.002 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf4] internal block pre sleep
10:17:59.202 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf5] external block pre future
10:17:59.206 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf5] external block post future
10:17:59.402 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf6] external block pre future
10:17:59.403 [wtf-async-test-akka.actor.default-dispatcher-2] INFO  org.wtf.test.WTFAsyncActor - ['wtf6] external block post future
10:17:59.421 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf1] internal block post sleep
10:17:59.421 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf6] internal block pre sleep
10:17:59.607 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf2] internal block post sleep
10:17:59.607 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf5] internal block pre sleep
10:17:59.608 [wtf-async-test-akka.actor.default-dispatcher-3] INFO  org.wtf.test.WTFAsyncActor - ['wtf7] external block pre future

我认为可以肯定地说,只要某个线程可用,无论 Future block 尚未完成其执行,新消息都会被视为进来。我对吗?

我要揭示的是接收 block 是否必须等待 Future block 完成才能处理新消息。 似乎没有。 (例如,在 wtf1 完成线程 3 上的计算之前,wtf2 进入调度程序线程 3)

此行为有任何警告吗?

请原谅这个看似愚蠢的问题。我没有深入了解 akka 代码库(目前我对 scala 和 akka 还太陌生)

最佳答案

在这种情况下,后续消息可以在任何 Future 完成之前通过 receive 进行处理。这种行为实际上没有任何警告,并且有一些事情需要注意。特别是,不要关闭 Future 中 Actor 的任何可变状态。 Future 可能与 Actor 处理的消息同时执行,从而破坏了对 Actor 状态的单线程访问的保证,并导致竞争条件,这可能会使 Actor 的状态处于无效位置。

当您想要从 Actor 内异步启动某些工作时,常见的模式是启动子 Actor 来完成该工作。子 Actor 可以将带有任何结果的消息发送回其父 Actor,并且您确信子 Actor 无法干扰父 Actor 的状态。

关于scala - akka 的 Actor 的接收方法与 Future( block )交互 - 在 Future 完成之前新消息可以进入吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31634797/

相关文章:

scala - 将列名添加到从csv文件读取的数据中而没有列名

scala - 迭代器上的Scala映射不会产生副作用

c# - 异步架构

c# - 错误 : return keyword must not be followed by an object expression in c# async code

javascript - 通过ajax和url更新页面而不重新加载页面(node.js)

class - Scala REPL "error: value > is not a member of type parameter T"

mysql - NoClassDefFound错误: CompanionConversion in scala activerecord

scala - 使用密封特征和密封抽象类作为基类之间的区别

scala - Finagle 和 Akka,为什么不一起使用呢?

scala - Akka Streams Reactive Kafka - 高负载下的 OutOfMemoryError