akka.net actor并行执行

标签 akka.net

我们正在 akka.net 上进行 POC 来处理 json 文件。我正在努力寻找批处理 JArray 的最佳方法。在我的实现中,akka 协调器 actor 收到以下消息:

//coordinator actor receive
public class ValidatedInput
{
public JArray Data { get; set; }
}

我的协调器 Actor 可以一次性处理完整的 JArray,如下所示,但我正在努力启动多个并行 Actor,每个 Actor 都会处理 JArray 中的 50 条记录。

//coordinator actor receives messages and calls transform actor to process
public void Receiving()
{
Receive<ValidatedInput>(x =>
{
TransformerRouter.Tell(x);
});
}

//transform actor receives message and process, sample code
Receive<ValidatedInput>(x =>
{
PipeToSupport.PipeTo<TransformResult>(MapDataAsync(x).ContinueWith(data =>
{
return new TransformResult();}), Self);
});

有没有像下面这样的方法,我可以传递 50 个 JArray 记录以供每个参与者处理并收集结果,例如:

Receive<ValidatedInputDataResult>(
{
TransformerRouter.Tell(x.Data.Take(50);
});

最佳答案

有一段时间没有使用 Akka.NET,但是当我使用 Akka.NET 时,我总是尽可能避免传递集合,主要有两个原因:

  • 您可以发送给参与者的消息大小有限制,尽管可以增加此限制 this isn't recommended .

  • 发送给 actor 的所有消息都会被序列化,然后在 Receive<> 时反序列化。 'd,这意味着如果您在消息中发送数组或其他对象集合,则每次使用 Tell 时,您都会面临在大对象堆上分配它们的风险。方法,如果这是热代码路径,则应尽可能避免使用该方法。

我当时解决此类问题的方法是:

  • 拥有一位“顶级”协调 Actor :
    • 包含一个路由器,工作参与者位于其后面。例如,您可以将路由器配置为 distribute messages in a round-robin fashion .
    • 每次出现新消息时都会生成一个“聚合器”参与者 Receive 'd, worker Actor 将把他们的结果发送到那里。您可以使用 Tell方法并传递聚合器的参与者引用,以便工作人员将聚合器视为 Sender在他们的 Actor 背景中。
  • “顶级”参与者中的路由器配置为 automatically spawn more actors when needed
  • worker Actor 所做的无非是 Receive一条消息,处理它然后 TellSender在其上下文中。

请记住,这个建议可能不完整,因为我当时对使用 Actor 系统不是很“流利”,而且我已经大约 6 个月没有积极使用 Akka.NET 了,可能会有更好的完成您需要的方法。

我建议在 Google 上搜索“actor 系统模式”和“Scala actor 模式”,并阅读一些开源 Scala 项目源代码,这也会给您带来一些见解。

最后,一个避免 future 麻烦的提示:消息类型应该始终是不可变的。所以你的ValidatedInput应该看起来像这样:

public class ValidatedInput
{
    public readonly JArray Data { get; }

    public ValidatedInput(JArray data)
    {
        Data = data;
    }
}

或者更好:

public class ValidatedInput
{
    public readonly IReadOnlyList<JToken> Data { get; }

    public ValidatedInput(IReadOnlyList<JToken> data)
    {
        Data = data;
    }
}

希望这对您有所帮助,祝您好运!

关于akka.net actor并行执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41161811/

相关文章:

powershell - 为什么从 PowerShell 调用 F# 代码时收到 MissingMethodException?

c# - 我如何调整生产者 Actor 的速度,以便它不会向消费者 Actor 传达太多消息

c# - 通过Docker容器进行Akka.net远程处理:客户端随机无法连接到主机

azure - 将 Akka.net 集群八卦发送到 Azure 辅助角色控制台

c# - Akka.net vs Orleans 表现

c# - Akka.NET 无法识别我的自定义记录器并默认为 BusLogger

f# - 如何在 F# 中使用 Akka.Streams.*.ConcatMany?

c# - Akka.Net 远程处理 : ActorSelection vs. IActorRef

.net - Akka.Net 和缓存一致性

c# - 父角色如何跟踪子角色执行的子任务的状态