java - ZeroMQ 产生的结果微乎其微

标签 java message-queue zeromq jeromq

我正在测试 ZeroMQ,每秒只能收到大约 1227 - 1276 条消息。然而我读到这些应该超过这个数量的 100 倍。

我做错了什么?我可以指定一些配置来解决这个问题吗?

我正在使用以下功能:

public static final String SERVER_LOCATION = "127.0.0.1";
public static final int SERVER_BIND_PORT = 5570;

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{
    ZContext ctx = new ZContext();

    Socket frontend = ctx.createSocket(ZMQ.PULL);
    frontend.bind("tcp://*:"+SERVER_BIND_PORT);

    int i = 1;
    do{
        ZMsg msg = ZMsg.recvMsg(frontend);
        ZFrame content = msg.pop();
        if(content!= null){
            msg.destroy();
            System.out.println("Received: "+i);
            i++;
            content.destroy();
        }
    }while(true);
}

public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PUSH);

    client.setIdentity("i".getBytes());
    client.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);

    PollItem[] items = new PollItem[] { new PollItem(client, Poller.POLLIN) };
    int i = 1;
    Timer t = new Timer(timeToSpendSending);
    t.start();
    do{
        client.send(/* object to send*/ , 0);
        i++;
    }while(!t.isDone());

    System.out.println("Done with "+i);
}

用于限制程序运行时间的定时器类:

class Timer extends Thread{
    int time;
    boolean done;
    public Timer(int time){
        this.time = time;
        done = false;
    }
    public void run(){
        try {
            this.sleep(time);
            done = true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public boolean isDone(){
        return done;
    }
}

编辑:我正在使用 jeroMQ

<dependency>
    <groupId>org.zeromq</groupId>
    <artifactId>jeromq</artifactId>
    <version>0.3.4</version>
</dependency>

最佳答案

我必须更换连接方法并删除高水位线(设置为 0 以表示内存中的消息不受限制)

代码如下:

public static final String SERVER_LOCATION = "127.0.0.1";
public static final int SERVER_BIND_PORT = 5570;
public static final String TOPIC = "topic1";

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{
    // Prepare our context and subscribe
       Context context = ZMQ.context(1);
       Socket subscriber = context.socket(ZMQ.SUB);

       subscriber.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);
       subscriber.setRcvHWM(0);
       subscriber.subscribe(TOPIC.getBytes());
       System.out.println("subscribed to  "+TOPIC);
       int i = 1;
       boolean started = false;
       Timer t = new Timer(timeToSpendSending);
       do{
           String msg = subscriber.recvStr();
           if(!TOPIC.equals(msg)){
               if(!started){
                   t.start();
                   started = true;
               }
               i++;
           }
       }while(!t.isDone());
       System.out.println("Done with: "+i);
       subscriber.close();
       context.term();
   }
   public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{
       Context context = ZMQ.context(1);
       Socket publisher = context.socket(ZMQ.PUSH);
       publisher.bind("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);
       publisher.setHWM(0);
       publisher.setSndHWM(0);

       int i = 1;
       Timer t = new Timer(timeToSpendSending);
       t.start();
       do{
          publisher.sendMore(TOPIC);
          publisher.send("Test Data number "+i);
          i++;
      }while(!t.isDone());
      System.out.println("Done with: "+i);
      publisher.close();
      context.term();
   }

像这样,我收到的消息计数范围为发送时每秒 250,000 条,接收时每秒 145,000 条。

关于java - ZeroMQ 产生的结果微乎其微,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25884948/

相关文章:

Java套接字问题

java - Java 的 sql.Date 会产生 off by one 错误吗?

asynchronous - 实现可靠异步消息访问的服务版本控制的权衡?

javascript - 如何让 ZMQ 路由器在繁忙时引发错误?

c++ - c++中面向性能的消息回调解决方案

java - Google docs api -- 上传 XML 文档

java - Android afterTextChanged获取EditText标签

network-programming - Apache ActiveMQ 与原始套接字的性能开销是多少?

message-queue - HornetQ 核心 API 和 JMS

python - 在死服务器上使用 ZeroMQ 时终止 python 脚本