据我了解 Akka 并行性,Actor 使用一个线程来处理每个传入消息。并且该线程包含一种状态。事实上,顺序消息不共享此状态。
但是 Actor 可能有一个 ExecutorContext 用于执行来自 Future 的回调。这就是我不再清楚地理解并行性的地方。
例如,我们有以下 Actor :
class AnyActor(target: ActorRef) extends Actor {
implicit val ec: ExecutionContext = context.dispatcher
def receive = {
case messageA =>
val api = createApi()
val furureA: Future[F] = api.callA
api.close()
futureA.pipeTo(sender())
case messageB =>
val api = createApi()
val furureB: Future[F] = api.callB
api.close()
futureB.pipeTo(sender())
}
}
假设,Actor 收到 messageA,并且 Thread1 创建 api 实例 - 让我们调用“api1”。还有一个带有 N 个线程的executionContext。其中一个线程用于从furureA 检索结果。
我不明白的是,这 N 个线程如何与 Thread1 关联。 ExecutionContext 只为 Thread1 创建?或者它也是为 Thread2 创建的(在其中处理 messageB)?
最佳答案
总的来说,参与者在调度程序上运行,该调度程序从池中选择一个线程,并运行该参与者的 Receive
来接收来自邮箱的一定数量的消息。一般来说,不能保证一个actor将在给定的线程上运行(忽略空洞的例子,比如带有单个线程的池,或者总是在特定线程中运行给定actor的调度程序)。
该调度程序也是一个 Scala ExecutionContext,它允许安排任意任务在其线程池上执行;此类任务包括 Future
回调。
那么在你的 Actor 中,当收到 messageA
时会发生什么?
- 参与者调用
createApi()
并保存 - 它调用
api
上的callA
方法 - 它关闭
api
- 当发送者可用时,它会安排转发
callA
的结果 - 现在已准备好处理另一条消息,并且可能会或可能不会实际处理另一条消息
这实际上意味着什么取决于callA
的作用。如果callA
在执行上下文上调度一个任务,一旦任务被调度并且回调已经安排好,它就会返回future;不能保证当 future 返回时任务或回调已经执行。一旦 future 返回,您的 actor 就会关闭 api
(因此这可能会在任务或回调执行过程中的任何时刻发生)。
简而言之,根据 api
的实现方式(并且您可能无法控制它的实现方式)和实现细节,可以采用以下顺序
- 线程 1(处理
messageA
)在调度程序中设置任务 - Thread1 关闭
api
并安排通过管道传输结果 - 线程2开始执行任务
- 线程 1 继续处理其他消息
- Thread2 的任务失败,因为
api
已关闭
简而言之,当混合 Future
和 actor 时,Akka 中的“单线程幻象”就可以被打破:任意多个线程都可以操纵 actor 的状态。
在此示例中,由于 Future
land 和 actorland 之间的唯一共享状态是单个消息处理的本地状态,因此情况并没有那么糟糕:这里有效的一般规则是:
- 一旦您将可变(例如可关闭)状态从参与者传递给 future (这包括,除非您完全确定正在发生什么,否则调用该有状态对象上返回 future 的方法),这是最好的 Actor 忘记该对象的存在
那么如何关闭api
?
好吧,假设在 messageA
之后 callA
没有使用 api
执行任何奇怪的操作(例如将实例保存在某些实例池中) > 处理完毕, future 也完成,没有任何东西可以访问api
。因此,最简单也可能是最正确的做法是安排 api
在 future 完成后关闭,沿着这些思路
val api = createApi()
val futureA: Future[F] = api.callA
futureA.foreach { _ => api.close() }
futureA.pipeTo(sender())
关于scala - Actor 与 ExecutionContext 的理解,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70137297/