java - Akka 分布式 pub sub : Java implementation not working

标签 java akka akka-cluster

订阅者的主类:Application.java

package com.mynamespace;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;

import com.mynamespace.actors.SubscriberActor;

@SpringBootApplication
@ComponentScan(basePackages = "com.mynamespace.*")   
public class Application {

    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(Application.class, args);
        // get hold of the actor system
        ActorSystem system = ctx.getBean(ActorSystem.class);
        ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
        ActorRef subscriber = system.actorOf(
                Props.create(SubscriberActor.class), "subscriber");
       // subscribe to the topic named "content"
        mediator.tell(new DistributedPubSubMediator.Put(subscriber), subscriber);
        // subscriber.tell("init", null);
        System.out.println("Running.");
        Thread.sleep(5000l);
    }
}

订阅者 Actor :SubscriberActor.java

package com.mynamespace.actors;

import java.util.ArrayList;
import java.util.List;

import akka.actor.UntypedActor;

import com.mynamespace.message.CategoryServiceRequest;
import com.mynamespace.message.CategoryServiceResponse;

public class SubscriberActor extends UntypedActor {

    @Override
    public void onReceive(Object msg) throws Exception {
        if (msg instanceof CategoryServiceRequest) {
            System.out.println("Request received for GetCategories.");
            CategoryServiceResponse response = new CategoryServiceResponse();
            List<String> categories = new ArrayList<>();
            categories.add("Food");
            categories.add("Fruits");
            response.setCatgories(categories);
            getSender().tell(response, getSelf());
        } else if (msg instanceof String && msg.equals("init")) {
            System.out.println("init called");
        } else {
            System.out
                .println("Unhandelled message received for getCategories.");
        }
    }

}

订阅者的Application.conf

akka {
    loglevel = INFO
    stdout-loglevel = INFO
    loggers = ["akka.event.slf4j.Slf4jLogger"]
    extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }

    remote {
       enabled-transports = ["akka.remote.netty.tcp"]
       netty.tcp {
         hostname = "127.0.0.1"
         port = 0
       }
     }

     cluster {
    seed-nodes = [
      "akka.tcp://mynamespace-actor-system@127.0.0.1:2551",
      "akka.tcp://mynamespace-actor-system@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
    }

}

发布者的主类:Application.java

package com.mynamespace;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;

import com.mynamespace.actors.PublisherActor;

@SpringBootApplication
@ComponentScan(basePackages = "com.mynamespace.*")
public class Application {

    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(Application.class, args);
        // get hold of the actor system
        ActorSystem system = ctx.getBean(ActorSystem.class);
        ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
        ActorRef publisher = system.actorOf(Props.create(PublisherActor.class),
            "publisher");
        mediator.tell(new DistributedPubSubMediator.Put(publisher), publisher);
        Thread.sleep(5000);
        publisher.tell("hi", publisher);
        System.out.println("Running.");
    }
}

PublisherActor.java

package com.mynamespace.actors;

import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
import akka.util.Timeout;

import com.mynamespace.message.CategoryServiceRequest;
import com.mynamespace.message.CategoryServiceResponse;

public class PublisherActor extends UntypedActor {

    // activate the extension
    ActorRef mediator = DistributedPubSubExtension.get(getContext().system())
        .mediator();

    public void onReceive(Object msg) {
        if (msg instanceof String) {
            Timeout timeOut = new Timeout(50000l);
            mediator.tell(new DistributedPubSubMediator.Send(
                    "/user/subscriber", new CategoryServiceRequest()),
                    getSelf());
            Future<Object> response = Patterns.ask(mediator,
                    new DistributedPubSubMediator.Send("/user/subscriber",
                            new CategoryServiceRequest()), timeOut);
            Future<CategoryServiceResponse> finalresponse = response.map(
                    new Mapper<Object, CategoryServiceResponse>() {

                        @Override
                        public CategoryServiceResponse apply(Object parameter) {
                            CategoryServiceResponse responseFromRemote = (CategoryServiceResponse) parameter;
                            System.out.println("received:: list of size:: "
                                + responseFromRemote.getCatgories().size());
                            return responseFromRemote;
                        }

                    }, getContext().system().dispatcher());
        } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
            System.out.println("subscribbed.......");

        } else {
            unhandled(msg);
        }
    }
}

发布者的应用程序配置与订阅者的相同。两者都在同一系统的不同端口上运行。

我在本地系统上定义并运行了两个种子节点。不知何故,我无法通过 DistributedPubSub 调解器向生产者(均在不同节点上运行)询问/告知订阅者。

在运行 Subscriber 然后是 publisher 之后:我没有在 stdout/logs 中打印任何异常或任何死信引用。

是否可以查看我的调解员持有哪些 Actor 引用资料?

需要帮助来发现问题或可能的问题。

最佳答案

我遇到了同样的问题,在@spam 的评论和我自己的实验之后,我可以推荐的是对组使用发布/订阅和 sendOneMessageToEachGroup=true

是否假设 Send 仅在本地有效?如果是这样,文档没有明确说明。但我也可以通过那里的代码看出文档的这个特定部分显然被忽略了(如更改类名但不调用那些,调用前面示例中的前面的)

希望这对遇到此问题的任何人有所帮助,因为文档显然有点误导

关于java - Akka 分布式 pub sub : Java implementation not working,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31536080/

相关文章:

scala - 选择Akka还是Spark进行并行处理?

akka - 您是否使用 Akka Cluster 在分布式系统中的每个主机上运行单独的 Actor 系统?

java - Java 中的泛型方法返回参数

java - 是否可以在 C :/Windows/System32/location at runtime? 中放置一个 myfile.file

scala - Akka log-dead-letters 和 log-dead-letters-during-shutdown 不起作用

使用 Akka 进行 Scala Testkit 单元测试

java - 如何测试config.properties文件的 "key"和 "value"?

java - 无法显示选项卡式 JTable 的表头

json - 我应该使用spray 还是play 来进行高度可扩展和高效的rest json 处理?

scala - 如何在 Akka 集群中运行 Akka-HTTP 服务器?