java - ZeroMQ,我们可以使用 inproc : transport along with pub/sub messaging pattern

标签 java zeromq publish-subscribe event-driven-design jeromq

场景:

我们正在评估 ZeroMQ(特别是 jeroMq)的事件驱动机制。

应用程序是分布式的,其中多个服务(发布者和订阅者都是服务)可以存在于同一个 jvm 中或不同的节点中,这取决于部署架构。

观察

为了尝试,我创建了一个 pub/sub 模式,使用 inproc: 作为传输,使用 jero mq(版本:0.3.5)

  1. 线程发布是可以发布的(看起来是发布了,至少没有错误)
  2. 在另一个线程中的订阅者没有收到任何东西。

问题

inproc:pub/sub 一起使用是否可行?

尝试谷歌搜索但找不到任何具体的信息,任何见解?

pub/sub 的代码示例,带有 inproc:

使用 jero mq(版本:0.3.5)的 inproc pub sub 的工作代码示例,对以后访问这篇文章的人很有用。一个发布者发布主题 AB,两个订阅者接收 AB 分别

/**
 * @param args
 */
public static void main(String[] args) {

    // The single ZMQ instance
    final Context context = ZMQ.context(1);

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    //Publisher
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startPublishing(context);
        }
    });
    //Subscriber for topic "A"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startFirstSubscriber(context);
        }
    });
    // Subscriber for topic "B"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startSecondSubscriber(context);
        }
    });

}

/**
 * Prepare the publisher and publish
 * 
 * @param context
 */
private static void startPublishing(Context context) {

    Socket publisher = context.socket(ZMQ.PUB);
    publisher.bind("inproc://test");
    while (!Thread.currentThread().isInterrupted()) {
        // Write two messages, each with an envelope and content
        try {
            publisher.sendMore("A");
            publisher.send("We don't want to see this");
            LockSupport.parkNanos(1000);
            publisher.sendMore("B");
            publisher.send("We would like to see this");
        } catch (Throwable e) {

            e.printStackTrace();
        }
    }
    publisher.close();
    context.term();
}

/**
 * Prepare and receive through the subscriber
 * 
 * @param context
 */
private static void startFirstSubscriber(Context context) {

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");

    subscriber.subscribe("B".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber1 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}

/**
 * Prepare and receive though the subscriber
 * 
 * @param context
 */
private static void startSecondSubscriber(Context context) {
    // Prepare our context and subscriber

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");
    subscriber.subscribe("A".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber2 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}

最佳答案

The ZMQ inproc transport旨在用于不同线程之间的单个进程内。当您说“可以存在于相同的 jvm 或不同的节点中”(强调我的)时,我假设您的意思是您正在将多个进程作为分布式服务而不是单个进程中的多个线程.

如果是这样,那么不,您尝试执行的操作将无法与 inproc 一起使用。 PUB-SUB/inproc 可以在多个线程之间的单个进程中正常工作。


编辑以解决评论中的其他问题:

使用像 inprocipc 这样的传输的原因是当你在正确的上下文中时,它比 tcp 传输更有效(更快)使用它们。可以想象,您可以混合使用多种传输方式,但您始终必须在同一传输方式上进行绑定(bind)和连接才能使其正常工作。

这意味着每个节点最多需要三个 PUBSUB 套接字 - 一个 tcp 发布者与远程主机上的节点对话,一个 ipc 发布者与同一主机上不同进程中的节点对话,一个 inproc 发布者与同一进程中不同线程中的节点对话。

实际上,在大多数情况下,您只需使用 tcp 传输并且只为所有内容启动一个套接字 - tcp 无处不在。如果每个套接字负责特定种类的信息,则启动多个套接字可能是有意义的。

如果您总是向其他线程发送一种消息类型并向其他主机发送不同的消息类型是有原因的,那么多个套接字是有意义的,但在您的情况下,从一个节点的角度来看,所有其他节点都是平等的。在那种情况下,我会在所有地方使用 tcp 并完成它。

关于java - ZeroMQ,我们可以使用 inproc : transport along with pub/sub messaging pattern,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35453816/

相关文章:

zeromq - 如何检查zeromq发送数据是否成功

python - 通过 zeromq pyzmq 发送一个数字

php - 如何使用 Redis Pub/Sub 在多个工作人员之间映射工作?

ip - 获取 RTI DDS 上 DataWriter/Publisher 的 IP 地址?

java - CollapsingToolbarLayout 无法识别滚动 throw

java - 从线程内部调用时 Spring 数据存储库 find 方法挂起

getConnection() 上的 java.util.NoSuchElementException

java - 循环完成后程序未终止

messaging - ZeroMQ 准备好用于生产了吗?

具有发布者和订阅者特征的 Scala 类