场景:
我们正在评估 ZeroMQ
(特别是 jeroMq
)的事件驱动机制。
应用程序是分布式的,其中多个服务(发布者和订阅者都是服务)可以存在于同一个 jvm 中或不同的节点中,这取决于部署架构。
观察
为了尝试,我创建了一个 pub
/sub
模式,使用 inproc:
作为传输,使用 jero mq(版本:0.3.5)
- 线程发布是可以发布的(看起来是发布了,至少没有错误)
- 在另一个线程中的订阅者没有收到任何东西。
问题
将 inproc:
与 pub
/sub
一起使用是否可行?
尝试谷歌搜索但找不到任何具体的信息,任何见解?
pub
/sub
的代码示例,带有 inproc:
使用 jero mq(版本:0.3.5)的 inproc pub sub 的工作代码示例,对以后访问这篇文章的人很有用。一个发布者发布主题 A
和 B
,两个订阅者接收 A
和 B
分别
/**
* @param args
*/
public static void main(String[] args) {
// The single ZMQ instance
final Context context = ZMQ.context(1);
ExecutorService executorService = Executors.newFixedThreadPool(3);
//Publisher
executorService.execute(new Runnable() {
@Override
public void run() {
startPublishing(context);
}
});
//Subscriber for topic "A"
executorService.execute(new Runnable() {
@Override
public void run() {
startFirstSubscriber(context);
}
});
// Subscriber for topic "B"
executorService.execute(new Runnable() {
@Override
public void run() {
startSecondSubscriber(context);
}
});
}
/**
* Prepare the publisher and publish
*
* @param context
*/
private static void startPublishing(Context context) {
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("inproc://test");
while (!Thread.currentThread().isInterrupted()) {
// Write two messages, each with an envelope and content
try {
publisher.sendMore("A");
publisher.send("We don't want to see this");
LockSupport.parkNanos(1000);
publisher.sendMore("B");
publisher.send("We would like to see this");
} catch (Throwable e) {
e.printStackTrace();
}
}
publisher.close();
context.term();
}
/**
* Prepare and receive through the subscriber
*
* @param context
*/
private static void startFirstSubscriber(Context context) {
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("B".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber1 " + address + " : " + contents);
}
subscriber.close();
context.term();
}
/**
* Prepare and receive though the subscriber
*
* @param context
*/
private static void startSecondSubscriber(Context context) {
// Prepare our context and subscriber
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("A".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber2 " + address + " : " + contents);
}
subscriber.close();
context.term();
}
最佳答案
The ZMQ inproc
transport旨在用于不同线程之间的单个进程内。当您说“可以存在于相同的 jvm 或不同的节点中”(强调我的)时,我假设您的意思是您正在将多个进程作为分布式服务而不是单个进程中的多个线程.
如果是这样,那么不,您尝试执行的操作将无法与 inproc
一起使用。 PUB-SUB/inproc
可以在多个线程之间的单个进程中正常工作。
编辑以解决评论中的其他问题:
使用像 inproc
或 ipc
这样的传输的原因是当你在正确的上下文中时,它比 tcp 传输更有效(更快)使用它们。可以想象,您可以混合使用多种传输方式,但您始终必须在同一传输方式上进行绑定(bind)和连接才能使其正常工作。
这意味着每个节点最多需要三个 PUB
或 SUB
套接字 - 一个 tcp
发布者与远程主机上的节点对话,一个 ipc
发布者与同一主机上不同进程中的节点对话,一个 inproc
发布者与同一进程中不同线程中的节点对话。
实际上,在大多数情况下,您只需使用 tcp
传输并且只为所有内容启动一个套接字 - tcp
无处不在。如果每个套接字负责特定种类的信息,则启动多个套接字可能是有意义的。
如果您总是向其他线程发送一种消息类型并向其他主机发送不同的消息类型是有原因的,那么多个套接字是有意义的,但在您的情况下,从一个节点的角度来看,所有其他节点都是平等的。在那种情况下,我会在所有地方使用 tcp
并完成它。
关于java - ZeroMQ,我们可以使用 inproc : transport along with pub/sub messaging pattern,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35453816/