scala - Actor 与 ExecutionContext 的理解

标签 scala parallel-processing akka executioncontext

据我了解 Akka 并行性,Actor 使用一个线程来处理每个传入消息。并且该线程包含一种状态。事实上,顺序消息不共享此状态。

但是 Actor 可能有一个 ExecutorContext 用于执行来自 Future 的回调。这就是我不再清楚地理解并行性的地方。

例如,我们有以下 Actor :

  class AnyActor(target: ActorRef) extends Actor { 
          implicit val ec: ExecutionContext = context.dispatcher

          def receive = {
               case messageA =>
                    val api = createApi()
                    val furureA: Future[F] = api.callA 
                        api.close()
                    futureA.pipeTo(sender()) 
               case messageB =>
                    val api = createApi()
                    val furureB: Future[F] = api.callB 
                        api.close()
                    futureB.pipeTo(sender()) 
             }
  }

假设,Actor 收到 messageA,并且 Thread1 创建 api 实例 - 让我们调用“api1”。还有一个带有 N 个线程的executionContext。其中一个线程用于从furureA 检索结果。

我不明白的是,这 N 个线程如何与 Thread1 关联。 ExecutionContext 只为 Thread1 创建?或者它也是为 Thread2 创建的(在其中处理 messageB)?

最佳答案

总的来说,参与者在调度程序上运行,该调度程序从池中选择一个线程,并运行该参与者的 Receive 来接收来自邮箱的一定数量的消息。一般来说,不能保证一个actor将在给定的线程上运行(忽略空洞的例子,比如带有单个线程的池,或者总是在特定线程中运行给定actor的调度程序)。

该调度程序也是一个 Scala ExecutionContext,它允许安排任意任务在其线程池上执行;此类任务包括 Future 回调。

那么在你的 Actor 中,当收到 messageA 时会发生什么?

  • 参与者调用 createApi() 并保存
  • 它调用 api 上的 callA 方法
  • 它关闭api
  • 当发送者可用时,它会安排转发 callA 的结果
  • 现在已准备好处理另一条消息,并且可能会或可能不会实际处理另一条消息

这实际上意味着什么取决于callA 的作用。如果callA在执行上下文上调度一个任务,一旦任务被调度并且回调已经安排好,它就会返回future;不能保证当 future 返回时任务或回调已经执行。一旦 future 返回,您的 actor 就会关闭 api(因此这可能会在任务或回调执行过程中的任何时刻发生)。

简而言之,根据 api 的实现方式(并且您可能无法控制它的实现方式)和实现细节,可以采用以下顺序

  • 线程 1(处理 messageA)在调度程序中设置任务
  • Thread1 关闭 api 并安排通过管道传输结果
  • 线程2开始执行任务
  • 线程 1 继续处理其他消息
  • Thread2 的任务失败,因为 api 已关闭

简而言之,当混合 Future 和 actor 时,Akka 中的“单线程幻象”就可以被打破:任意多个线程都可以操纵 actor 的状态。

在此示例中,由于 Futureland 和 actorland 之间的唯一共享状态是单个消息处理的本地状态,因此情况并没有那么糟糕:这里有效的一般规则是:

  • 一旦您将可变(例如可关闭)状态从参与者传递给 future (这包括,除非您完全确定正在发生什么,否则调用该有状态对象上返回 future 的方法),这是最好的 Actor 忘记该对象的存在

那么如何关闭api

好吧,假设在 messageA 之后 callA 没有使用 api 执行任何奇怪的操作(例如将实例保存在某些实例池中) > 处理完毕, future 也完成,没有任何东西可以访问api。因此,最简单也可能是最正确的做法是安排 api 在 future 完成后关闭,沿着这些思路

val api = createApi()
val futureA: Future[F] = api.callA

futureA.foreach { _ => api.close() }
futureA.pipeTo(sender()) 

关于scala - Actor 与 ExecutionContext 的理解,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70137297/

相关文章:

sql - 在SQL Server中并行执行3条SQL语句

比较各种 pthread 结构的性能

java - 管理 Akka Actor 系统生命周期

scala - 将数字相加

scala - 使用 scala.math.max 减少 float 组

xml - Scala XML,获取父节点具有属性值匹配的节点

scala - 在 Spark Streaming 中反序列化来自 Kafka 的 Avro 格式数据给出空字符串和 0 for long

r - 我可以嵌套 parallel:::parLapply() 吗?

stream - akka 流 ActorSubscriber 不适用于远程 Actor

scala - 来自 Akka 文档的 Actor DSL 示例