java - Akka Actor 阻止消息

标签 java concurrency akka actor

嗨,因为我无法正确解决问题,这可能不是我的问题的正确标题,但情况是这样的;

Actor A 创建 Actor B1,Actor B1 创建“n”个 Actor 负责执行任务。 所以 A 是 B1 的父级,B1 是 b1、b2、b3、... 的父级。

在 A 中,我安排了一个自动收录器,以便 A 每 10 秒检查一次是否有新的 B' 要创建。

Duration duration = Duration.create(10 sec.);
FiniteDuration interval = new FiniteDuration(duration.toMillis(), TimeUnit.MILLISECONDS);
ticker = getContext().system().scheduler().schedule(interval, interval, getSelf(), new Tick() { }, getContext().dispatcher(), getSelf());

在前端,我可以调整“b”个任务的并行度。 例如,如果我将并行度设置为 3,则 B(1) 创建 3 个参与者,每个参与者执行一些任务 如果一个 Actor ,比方说 b(n),完成后 b(n+1) 就会被创建,依此类推。

问题是;

如果只有一个 Actor b(i=1) 是由 Actor “B'”创建的(B 并不重要),那么股票行情就会真正滴答作响 每 10 秒一次,但如果我将 b 的并行度增加到 64 b(i=64),那么股票行情就无法正常运行。 它等待相当长的时间,例如1分钟。然后连续滴答 6 次,就好像有刷新机制一样。

这并不是我增加系统上参与者数量时遇到的唯一问题。

我有一个 API,以便用户可以向参与者发送订单,如下所示

String path = ActorPaths.actorPathForPlan(plan);
ActorSelection actorSelection = runtimeInit.getSystem().actorSelection(path);
// ask
Timeout timeout = new Timeout(Duration.create(4*1000, TimeUnit.MILLISECONDS));
Future<Object> future = Patterns.ask(actorSelection, message, timeout);
// get result
return returnType.cast(Await.result(future, timeout.duration()));

当有超过大约 10 个参与者时,futures 总是会超时,但是当我调试代码时,我会看到消息 已收到,但经过相当长的时间。

所以,我想知道是什么阻止了我的 Actor A 接收消息。同样的问题也可能发生在 Actor B' 及其 child 身上 我还没有检查过,但如果我弄清楚了问题,我相信我可以将解决方案应用于其他人。

感谢您的任何建议。

层次结构是这样的

/image/PRmwE.png

最佳答案

默认情况下,所有 Akka Actor 使用相同的执行器,最多只能使用 64 个线程。来自 http://doc.akka.io/docs/akka/snapshot/general/configuration.html :

# This will be used if you have set "executor = "default-executor"".
      # If an ActorSystem is created with a given ExecutionContext, this
      # ExecutionContext will be used as the default executor for all
      # dispatchers in the ActorSystem configured with
      # executor = "default-executor". Note that "default-executor"
      # is the default value for executor, and therefore used if not
      # specified otherwise. If no ExecutionContext is given,
      # the executor configured in "fallback" will be used.
      default-executor {
        fallback = "fork-join-executor"
      }

      # This will be used if you have set "executor = "fork-join-executor""
      fork-join-executor {
        # Min number of threads to cap factor-based parallelism number to
        parallelism-min = 8

        # The parallelism factor is used to determine thread pool size using the
        # following formula: ceil(available processors * factor). Resulting size
        # is then bounded by the parallelism-min and parallelism-max values.
        parallelism-factor = 3.0

        # Max number of threads to cap factor-based parallelism number to
        parallelism-max = 64
      }

该问题可能与 b* actor 中的阻塞调用有关。 Akka 从 64 个线程池中分配单独的线程来处理 b* actor 中的这些阻塞调用,并等待其中一个线程完成消息处理,以便能够处理 A 和 B 的消息。

请参阅http://doc.akka.io/docs/akka/snapshot/general/actor-systems.html中的“阻止需要仔细管理”了解如何解决此问题。

关于java - Akka Actor 阻止消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26243347/

相关文章:

java - 关于 "Java Concurrency in Practice"示例的问题

c# - Correct Concurrent Collection 存储定时非循环结构

Scala、Play、Akka、Websocket : how to pass actor messages through websocket

akka - 一个集群可以有多个单例 Actor 吗?

java - Websocket 上的 Spring STOMP - "private"消息传递

java - JNI EnsureLocalCapacity——为什么?

java - 如何在 Mac OS 10.5.8 上运行的 Tomcat/java 上设置 UTF8 语言?

java - JTable和JComboBox的结合使用

ios - 在主线程中非法访问托管对象上下文,为什么?

java - 在Akka模型中共享Cassandra集群/ session