我编写了以下代码,以便在 Future 中调用某些异步工作时尝试确定参与者对于传入消息的行为(实际上,异步工作旨在模拟 Slick 的异步数据库 API):
val actor = system.actorOf(Props[WTFAsyncActor])
for (i <- 1 until 100) {
actor ! 'wtf + i.toString
Thread.sleep(200l) // yes I know actors should not invoke this method
}
Thread.sleep(10000l) // same as above
Actor的代码如下:
class WTFAsyncActor extends Actor with ActorLogging{
import scala.concurrent.ExecutionContext.Implicits.global
override def receive: Receive = {
case i@_ =>
log.info(s"[$i] external block pre future")
Thread.`yield`() // just trying to relinquish control to induce context switching
Future {
log.info(s"[$i] internal block pre sleep")
Thread.sleep(1000l) // yeah bad - trying to emulate something meaningful like slick
log.info(s"[$i] internal block post sleep")
}
log.info(s"[$i] external block post future")
}
}
我得到以下日志(示例运行的摘录)
10:17:58.408 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf1] external block pre future
10:17:58.420 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf1] external block post future
10:17:58.421 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf1] internal block pre sleep
10:17:58.599 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf2] external block pre future
10:17:58.600 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf2] external block post future
10:17:58.600 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf2] internal block pre sleep
10:17:58.800 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf3] external block pre future
10:17:58.801 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf3] external block post future
10:17:58.801 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf3] internal block pre sleep
10:17:59.001 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf4] external block pre future
10:17:59.002 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf4] external block post future
10:17:59.002 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf4] internal block pre sleep
10:17:59.202 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf5] external block pre future
10:17:59.206 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf5] external block post future
10:17:59.402 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf6] external block pre future
10:17:59.403 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf6] external block post future
10:17:59.421 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf1] internal block post sleep
10:17:59.421 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf6] internal block pre sleep
10:17:59.607 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf2] internal block post sleep
10:17:59.607 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf5] internal block pre sleep
10:17:59.608 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf7] external block pre future
我认为可以肯定地说,只要某个线程可用,无论 Future block 尚未完成其执行,新消息都会被视为进来。我对吗?
我要揭示的是接收 block 是否必须等待 Future block 完成才能处理新消息。 似乎没有。 (例如,在 wtf1 完成线程 3 上的计算之前,wtf2 进入调度程序线程 3)
此行为有任何警告吗?
请原谅这个看似愚蠢的问题。我没有深入了解 akka 代码库(目前我对 scala 和 akka 还太陌生)
最佳答案
在这种情况下,后续消息可以在任何 Future 完成之前通过 receive
进行处理。这种行为实际上没有任何警告,并且有一些事情需要注意。特别是,不要关闭 Future 中 Actor 的任何可变状态。 Future 可能与 Actor 处理的消息同时执行,从而破坏了对 Actor 状态的单线程访问的保证,并导致竞争条件,这可能会使 Actor 的状态处于无效位置。
当您想要从 Actor 内异步启动某些工作时,常见的模式是启动子 Actor 来完成该工作。子 Actor 可以将带有任何结果的消息发送回其父 Actor,并且您确信子 Actor 无法干扰父 Actor 的状态。
关于scala - akka 的 Actor 的接收方法与 Future( block )交互 - 在 Future 完成之前新消息可以进入吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31634797/