我们正在 akka.net 上进行 POC 来处理 json 文件。我正在努力寻找批处理 JArray 的最佳方法。在我的实现中,akka 协调器 actor 收到以下消息:
//coordinator actor receive
public class ValidatedInput
{
public JArray Data { get; set; }
}
我的协调器 Actor 可以一次性处理完整的 JArray,如下所示,但我正在努力启动多个并行 Actor,每个 Actor 都会处理 JArray 中的 50 条记录。
//coordinator actor receives messages and calls transform actor to process
public void Receiving()
{
Receive<ValidatedInput>(x =>
{
TransformerRouter.Tell(x);
});
}
//transform actor receives message and process, sample code
Receive<ValidatedInput>(x =>
{
PipeToSupport.PipeTo<TransformResult>(MapDataAsync(x).ContinueWith(data =>
{
return new TransformResult();}), Self);
});
有没有像下面这样的方法,我可以传递 50 个 JArray 记录以供每个参与者处理并收集结果,例如:
Receive<ValidatedInputDataResult>(
{
TransformerRouter.Tell(x.Data.Take(50);
});
最佳答案
有一段时间没有使用 Akka.NET,但是当我使用 Akka.NET 时,我总是尽可能避免传递集合,主要有两个原因:
您可以发送给参与者的消息大小有限制,尽管可以增加此限制 this isn't recommended .
发送给 actor 的所有消息都会被序列化,然后在
Receive<>
时反序列化。 'd,这意味着如果您在消息中发送数组或其他对象集合,则每次使用Tell
时,您都会面临在大对象堆上分配它们的风险。方法,如果这是热代码路径,则应尽可能避免使用该方法。
我当时解决此类问题的方法是:
- 拥有一位“顶级”协调 Actor :
- 包含一个路由器,工作参与者位于其后面。例如,您可以将路由器配置为 distribute messages in a round-robin fashion .
- 每次出现新消息时都会生成一个“聚合器”参与者
Receive
'd, worker Actor 将把他们的结果发送到那里。您可以使用Tell
方法并传递聚合器的参与者引用,以便工作人员将聚合器视为Sender
在他们的 Actor 背景中。
- “顶级”参与者中的路由器配置为 automatically spawn more actors when needed
- worker Actor 所做的无非是
Receive
一条消息,处理它然后Tell
到Sender
在其上下文中。
请记住,这个建议可能不完整,因为我当时对使用 Actor 系统不是很“流利”,而且我已经大约 6 个月没有积极使用 Akka.NET 了,可能会有更好的完成您需要的方法。
我建议在 Google 上搜索“actor 系统模式”和“Scala actor 模式”,并阅读一些开源 Scala 项目源代码,这也会给您带来一些见解。
最后,一个避免 future 麻烦的提示:消息类型应该始终是不可变的。所以你的ValidatedInput
应该看起来像这样:
public class ValidatedInput
{
public readonly JArray Data { get; }
public ValidatedInput(JArray data)
{
Data = data;
}
}
或者更好:
public class ValidatedInput
{
public readonly IReadOnlyList<JToken> Data { get; }
public ValidatedInput(IReadOnlyList<JToken> data)
{
Data = data;
}
}
希望这对您有所帮助,祝您好运!
关于akka.net actor并行执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41161811/