messaging - ZeroMq 路由器静默丢弃消息

标签 messaging zeromq jeromq

我有一个服务器(ROUTER 套接字),它绑定(bind)并允许单个客户端(DEALER 套接字)连接到它。然后服务器开始发送数据。

理想情况下,我想知道路由器何时达到其 hwm 设置并开始丢弃消息。我已在路由器上将 ZMQ_ROUTER_MANDATORY 设置为 1,但这也无济于事。即使我故意没有启动客户端,路由器也会继续报告消息已发送(isAlive = false,因此在另一端没有任何东西可以拉出这些消息)。

我做错了什么还是 HWM 设置在 ROUTER 套接字上根本不可靠?

我在 Windows 7 64 位上使用 jeromq 版本 0.3.1 和 jdk 1.6.0_32

谢谢

public final class SenderSocket implements Runnable{

    private final int total;
    private final int sentHwm;
    private final String address;
    private final Socket sendSocket;
    private final ExecutorService executor;

    private final static String NAME        = SenderSocket.class.getSimpleName( );
    private final static Logger LOGGER      = LoggerFactory.getLogger( NAME );


    public SenderSocket( ZContext context, String address, int sentHwm, int total ){
        this.address        = address;
        this.total          = total;
        this.sentHwm        = sentHwm;
        this.sendSocket     = context.createSocket( ZMQ.ROUTER );
        this.executor       = Executors.newSingleThreadExecutor( );
    }


    public void init( ){

        sendSocket.setSndHWM( sentHwm );
        sendSocket.setRouterMandatory( true );
        sendSocket.bind( address );

        executor.execute( this );
        LOGGER.info("ROUTER configured with HWM {} bound to {}.", sentHwm, address );

    }



    @Override
    public void run( ){         

        for( int i =0; i <total; i++ ){      

            try{

                String item     = new StringBuilder(8).append(i).toString();
                boolean result  = sendSocket.send( item );

                LOGGER.info("SENT>> [{}] [{}]", result, item );

            }catch( ZMQException zmqEx ){

                int errorCode = zmqEx.getErrorCode();

                if( ZError.EHOSTUNREACH == errorCode ){
                    LOGGER.warn("Attempted to send message to but dealer is DOWN!");
                }

                if( ZMQ.Error.ETERM.getCode() == errorCode ){
                    LOGGER.error("Received error code [{}], terminating.");
                    stop();
                }

                LOGGER.error("ZMQException while sending message.", zmqEx);

            }catch( Exception ex ){
                LOGGER.error("Exception while sending message.", ex );
            }

        }

        stop();

    }


    public void stop( ){
        sendSocket.setLinger( 0 );
    }


}

//客户
    public class ReceiverSocket implements Runnable{

        private final int hwm;
        private final String address;
        private final Socket recvSocket;
        private final ExecutorService executor;

        private volatile boolean isAlive;

        private final static String NAME        = ReceiverSocket.class.getSimpleName( );
        private final static Logger LOGGER      = LoggerFactory.getLogger( NAME );


        public ReceiverSocket( ZContext context, String address, int hwm ){
            this.address        = address;
            this.hwm            = hwm;
            this.recvSocket     = context.createSocket( ZMQ.DEALER );
            this.executor       = Executors.newSingleThreadExecutor( );
        }


        public void init( ){

            this.isAlive = false;

            recvSocket.setRcvHWM( hwm );
            recvSocket.connect( address );
            executor.execute( this );

            LOGGER.info("DEALER configured with HWM {} connected to {}.", hwm, address );

        }



        @Override
        public void run( ){         

            Poller poller       = new Poller( 1 );
            poller.register( recvSocket, Poller.POLLIN );

            while(  isAlive ){      

                try{

                    int pollCount       = poller.poll( );

                    if( pollCount == NEGATIVE_ONE ){
                        LOGGER.warn("ERROR! Was the thread interrupted?", pollCount );
                        isAlive = false;
                        return;
                    }

                    if( poller.pollin( ZERO ) ){
                        String data = recvSocket.recvStr( );
                        LOGGER.info("RECVD >> {} {}", data, NEWLINE );
                    }

                }catch( Exception e ){
                    LOGGER.error("Exception while receving message.", e);
                }

            }

        }


        public void stop( ){
            recvSocket.setLinger( 0 );
            LOGGER.info("{} Stopped!", NAME );
        }
}

//主要的
public static void main( String[ ] args ) throws InterruptedException{

        int recvHwm          = 5;
        int sentHwm          = 5;
        int totalSent        = 5000;
        String address       = "tcp://*:20000";
        ZContext context     = new ZContext( 1 );

        ReceiverSocket recvr = new ReceiverSocket( context, address, recvHwm );
        SenderSocket sender  = new SenderSocket( context, address, sentHwm, totalSent );

        recvr.init();
        Thread.sleep( 1000 );

        sender.init();

    }

最佳答案

路由器强制和高水位线无关。

I have set the ZMQ_ROUTER_MANDATORY to 1 on the router but that doesn't help either. The router continues to report that the messages are sent even though I deliberatly didn't start the client



即使没有对等点连接到路由器,路由器也不会引发异常,除非您为特定客户端 ID 处理消息。
//#1 no exception raised here, message dropped silently
rtr.setRouterMandatory(true)
rtr.bind("tcp://*:20000")
rtr.send("omg!")

//#2 exception raised here
rtr.setRouterMandatory(true)
rtr.bind("tcp://*:20000")
rtr.sendMore("client1")
rtr.sendMore("")
rtr.send("omg!")

代码示例 #2 引发异常,因为您告诉路由器将“omg”发送给身份为 client1 的对等方。 .路由器套接字通过为每个连接的对等方分配一个随机标识来跟踪所有连接。如果路由器没有连接 client1 , 或者,如果 client1之前断开连接,路由器将在这两种情况下引发异常。

您可以在客户端分配一个身份来覆盖路由器的随机身份分配:
client.setIdentity("client1".getBytes())
client.connect("tcp://*:20000")

上面的代码阻止路由器套接字在示例 #2 中抛出异常

我建议阅读 this ,它解释了消息寻址和封装;了解它的工作原理对于使用路由器套接字至关重要。

关于messaging - ZeroMq 路由器静默丢弃消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20148347/

相关文章:

c++ - linux高性能消息在c++中的线程之间传递

ZeroMQ 与 Crossroads I/O

flask - 中断 iPythonNotebook 中的 Flask 应用程序导致 ZMQerror

java - 路由器/经销商代理不传送消息

java - 使用 java jeromq 的异步客户端/服务器

java - 多线程之间的通信

java - 在非分布式应用程序中使用消息队列是否有意义

c++ - 消息包头的设计模式

FFmpeg CLI - 使用 ZMQ (zmqsend) 交换 RTMP 源

java - 调用 socket.bind() 时应用程序关闭 - Android 上的 JeroMQ