我正在练习 Akka 路由,并在 Akka 文档中找到了这段代码。
路由器类别:
public class Router extends AbstractActor {
private List<Routee> routees = new ArrayList<Routee>();
akka.routing.Router router;
{
for(int i=1;i<=5;i++) {
ActorRef actor = getContext().actorOf(Props.create(Actor.class));
getContext().watch(actor);
routees.add(new ActorRefRoutee(actor));
System.out.println("Routee added");
}
router = new akka.routing.Router(new RoundRobinRoutingLogic(), routees);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Work.class, work -> {
router.route(work, getSender());
})
.match(Terminated.class, terminated -> {
System.out.println("Got actor terminated message");
router.removeRoutee(new ActorRefRoutee(terminated.actor()));
ActorRef actor = getContext().actorOf(Props.create(Actor.class));
getContext().watch(actor);
router.addRoutee(new ActorRefRoutee(actor));
System.out.println("Routee added back");
})
.build();
}
}
Actor 类别:
public class Actor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Work.class, work -> {
System.out.println("Work message received");
getContext().stop(getSelf());
})
.build();
}
}
Router
类创建 Actor
类的五个实例。实例存储在列表中。我在配置路由器时对其应用了 RoundRobinRoutingLogic
。
当我发送五条或更少的消息时,它工作得很好。但当消息数量超过 5 条时,就会显示死信错误。为什么?每次停止路线时,我都会添加一个新路线。谁能帮我解决这个问题吗?
最佳答案
Actor 的启动和停止以及Termulated
消息的生成都是异步发生的。在您的问题上下文中,这意味着您无法保证在当前设置下,在路由器收到第六条消息之前,新的路由已添加到路由器中。可能发生的情况是,路由器处理消息的速度太快,以至于当第六条 Work
消息到达时,还没有可用的路由。
顺便说一句,将类命名为“Router”和“Actor”非常令人困惑,因为 Akka API 已经使用了这些词。
关于java - Akka 路由器实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49317285/