multithreading - 如何保证akka中fork的顺序

标签 multithreading scala akka future

我们正在为每个(小的)传入消息组创建一个参与者链,以保证它们的顺序处理和管道(组通过公共(public) ID 区分)。问题是我们的链有 fork ,比如 A1 -> (A2 -> A3 | A4 -> A5) 我们应该保证通过 A2 -> A3A4 -> A5。当前的遗留解决方案是阻止 A1 actor 直到当前消息被完全处理(在其中一个子链中):

def receive { //pseudocode
    case x => ...
      val f = A2orA4 ? msg
      Await.complete(f, timeout)
}

因此,应用程序中的线程数与正在处理的消息数成正比,无论这些消息是事件的还是正在异步等待来自外部服务的某些响应。它与 fork-join(或任何其他动态)池一起工作大约两年,但当然不能与固定池一起工作,并且在高负载的情况下会极大地降低性能。不仅如此,它还会影响 GC,因为每个阻塞的 fork-actor 都会在内部保存冗余的先前消息的状态。

即使有背压,它也会创建比收到的消息多 N 倍的线程(因为流中有 N 个顺序 fork ),这仍然很糟糕,因为处理一条消息需要很长时间,但 CPU 不多。所以我们应该处理尽可能多的消息,因为我们有足够的内存。我提出的第一个解决方案 - 将链线性化,如 A1 -> A2 -> A3 -> A4 -> A5。还有更好的吗?

最佳答案

更简单的解决方案是将最后收到的消息的 future 存储到参与者的状态中,并将其与之前的 future 链接起来:

def receive = process(Future{new Ack}) //completed future
def process(prevAck: Future[Ack]): Receive = { //pseudocode
    case x => ...
        context become process(prevAck.flatMap(_ => A2orA4 ? msg))
}

因此它将创建没有任何阻塞的 future 链。 future 完成后链将被删除(最后一个除外)。

关于multithreading - 如何保证akka中fork的顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28044971/

相关文章:

scala - Apache scala Spark 数据帧列中稀疏向量的大小

scala - 有没有不用sbt安装scala的方法?

scala - 如何解构 Spray API HTTPResponse?

scala - 集群中参与者之间的 Akka 和状态

scala - Akka 流卡夫卡: No configuration setting found for key 'kafka-clients'

使用 C 在 Windows 中更改线程权限

c++ - 多线程服务器

Scala dropWhile 与过滤器

python - 从子线程停止主线程

java - 为什么在我将组件添加到 JFrame 而不是将它们添加到 JPanel 后,所有这些组件都消失了?