java - Akka 路由器实现

标签 java java-8 akka actor

我正在练习 Akka 路由,并在 Akka 文档中找到了这段代码。

路由器类别:

public class Router extends AbstractActor {

    private List<Routee> routees = new ArrayList<Routee>();

    akka.routing.Router router;
    {

        for(int i=1;i<=5;i++) {
            ActorRef actor = getContext().actorOf(Props.create(Actor.class));
            getContext().watch(actor);
            routees.add(new ActorRefRoutee(actor));
            System.out.println("Routee added");
        }

        router = new akka.routing.Router(new RoundRobinRoutingLogic(), routees);

    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Work.class, work -> {
                    router.route(work, getSender());
                })
                .match(Terminated.class, terminated -> {

                    System.out.println("Got actor terminated message");

                    router.removeRoutee(new ActorRefRoutee(terminated.actor()));
                    ActorRef actor = getContext().actorOf(Props.create(Actor.class));
                    getContext().watch(actor);
                    router.addRoutee(new ActorRefRoutee(actor));

                    System.out.println("Routee added back");
                })
                .build();
    }
}

Actor 类别:

public class Actor extends AbstractActor {

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Work.class, work -> {
                    System.out.println("Work message received");
                    getContext().stop(getSelf());
                })
                .build();
    }
}

Router 类创建 Actor 类的五个实例。实例存储在列表中。我在配置路由器时对其应用了 RoundRobinRoutingLogic

当我发送五条或更少的消息时,它工作得很好。但当消息数量超过 5 条时,就会显示死信错误。为什么?每次停止路线时,我都会添加一个新路线。谁能帮我解决这个问题吗?

最佳答案

Actor 的启动和停止以及Termulated 消息的生成都是异步发生的。在您的问题上下文中,这意味着您无法保证在当前设置下,在路由器收到第六条消息之前,新的路由已添加到路由器中。可能发生的情况是,路由器处理消息的速度太快,以至于当第六条 Work 消息到达时,还没有可用的路由。

顺便说一句,将类命名为“Router”和“Actor”非常令人困惑,因为 Akka API 已经使用了这些词。

关于java - Akka 路由器实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49317285/

相关文章:

java - 然而Eclipse错误: Incompatible JVM,,无法更新JVM

JPA 和 java 8 日期 API - 选择正确的实现(Instant、LocalDateTime、ZonedDateTime)

scala - Akka Actor-等待一段时间,等待消息,否则发送消息

multithreading - 在实践中在哪里以及如何使用 Actor

unit-testing - 使用 Mockito 模拟 Akka Actor 日志对象

java - 如何使用 RESTEasy 显示浏览器登录表单以进行基本身份验证

java - 帮助打破 if/else 吗?

JAVA - 如果其中一个函数失败,如何优雅地继续 while 循环?

java - 我如何总结两个 map 中的值并使用 Guava 返回值

java-8 - GeoServer 可以在 Tomcat 10.x 上运行吗?