java - Netty客户端有时收不到所有预期的消息

标签 java networking netty

我有一个相当简单的测试 Netty 服务器/客户端项目。我正在测试通信稳定性的某些方面,方法是向服务器发送大量消息,并对返回的消息和字节进行计数,以确保一切都匹配。

当我从客户端运行洪水时,客户端会跟踪它发送的消息数量以及返回的消息数量,然后当数量彼此相等时,它会打印出一些统计信息。

在本地运行时的某些情况下(我猜是因为拥塞?)客户端永远不会打印出最终消息。当这两个组件位于远程计算机上时,我没有遇到此问题。任何建议将不胜感激:

编码器只是一个简单的 OneToOneEncoder,它将 Envelope 类型编码为 ChannelBuffer,而解码器是一个简单的 ReplayDecoder,执行相反的操作。

我尝试向客户端处理程序添加 ChannelInterestChanged 方法,以查看 channel 的兴趣是否更改为不读取,但情况似乎并非如此。

相关代码如下:

谢谢!

服务器

    public class Server {

    // configuration --------------------------------------------------------------------------------------------------
    private final int port;
    private ServerChannelFactory serverFactory;
    // constructors ---------------------------------------------------------------------------------------------------

    public Server(int port) {
        this.port = port;
    }


    // public methods -------------------------------------------------------------------------------------------------
    public boolean start() {
        ExecutorService bossThreadPool = Executors.newCachedThreadPool();
        ExecutorService childThreadPool = Executors.newCachedThreadPool();

        this.serverFactory = new NioServerSocketChannelFactory(bossThreadPool, childThreadPool);
        this.channelGroup = new DeviceIdAwareChannelGroup(this + "-channelGroup");
        ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("encoder", Encoder.getInstance());
                pipeline.addLast("decoder", new Decoder());
                pipeline.addLast("handler", new ServerHandler());
                return pipeline;
            }
        };

        ServerBootstrap bootstrap = new ServerBootstrap(this.serverFactory);
        bootstrap.setOption("reuseAddress", true);
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);
        bootstrap.setPipelineFactory(pipelineFactory);

        Channel channel = bootstrap.bind(new InetSocketAddress(this.port));
        if (!channel.isBound()) {
            this.stop();
            return false;
        }

        this.channelGroup.add(channel);
        return true;
    }

    public void stop() {
        if (this.channelGroup != null) {
            ChannelGroupFuture channelGroupCloseFuture = this.channelGroup.close();
            System.out.println("waiting for ChannelGroup shutdown...");
            channelGroupCloseFuture.awaitUninterruptibly();
        }
        if (this.serverFactory != null) {
            this.serverFactory.releaseExternalResources();
        }
    }

    // main -----------------------------------------------------------------------------------------------------------
    public static void main(String[] args) {
        int port;
        if (args.length != 3) {
            System.out.println("No arguments found using default values");
            port = 9999;
        } else {
            port = Integer.parseInt(args[1]);
        }

        final Server server = new Server( port);

        if (!server.start()) {
            System.exit(-1);
        }
        System.out.println("Server started on port 9999 ... ");
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                server.stop();
            }
        });
    }
}

服务器处理程序

 public class ServerHandler extends SimpleChannelUpstreamHandler {

    // internal vars --------------------------------------------------------------------------------------------------

    private AtomicInteger numMessagesReceived=new AtomicInteger(0);

    // constructors ---------------------------------------------------------------------------------------------------
    public ServerHandler() {
    }

    // SimpleChannelUpstreamHandler -----------------------------------------------------------------------------------
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        Channel c = e.getChannel();
        System.out.println("ChannelConnected: channel id: " + c.getId() + ", remote host: " + c.getRemoteAddress() + ", isChannelConnected(): " + c.isConnected());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        System.out.println("*** EXCEPTION CAUGHT!!! ***");
        e.getChannel().close();
    }

    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        System.out.println("*** CHANNEL DISCONNECTED ***");

    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        if(numMessagesReceived.incrementAndGet()%1000==0 ){
             System.out.println("["+numMessagesReceived+"-TH MSG]: Received message: " + e.getMessage());
        }

        if (e.getMessage() instanceof Envelope) {
                // echo it...
                if (e.getChannel().isWritable()) {
                    e.getChannel().write(e.getMessage());
                }
        } else {
            super.messageReceived(ctx, e);
        }
    }
}

客户端

public class Client implements ClientHandlerListener {

    // configuration --------------------------------------------------------------------------------------------------
    private final String host;
    private final int port;
    private final int messages;
    // internal vars --------------------------------------------------------------------------------------------------
    private ChannelFactory clientFactory;
    private ChannelGroup channelGroup;
    private ClientHandler handler;
    private final AtomicInteger received;
    private long startTime;
    private ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

    // constructors ---------------------------------------------------------------------------------------------------
    public Client(String host, int port, int messages) {
        this.host = host;
        this.port = port;
        this.messages = messages;
        this.received = new AtomicInteger(0);
    }

    // ClientHandlerListener ------------------------------------------------------------------------------------------
    @Override
    public void messageReceived(Envelope message) {
        if (this.received.incrementAndGet() == this.messages) {
            long stopTime = System.currentTimeMillis();
            float timeInSeconds = (stopTime - this.startTime) / 1000f;
            System.err.println("Sent and received " + this.messages + " in " + timeInSeconds + "s");
            System.err.println("That's " + (this.messages / timeInSeconds) + " echoes per second!");
        }
    }

    // public methods -------------------------------------------------------------------------------------------------
    public boolean start() {

        // For production scenarios, use limited sized thread pools
        this.clientFactory = new NioClientSocketChannelFactory(cachedThreadPool, cachedThreadPool);
        this.channelGroup = new DefaultChannelGroup(this + "-channelGroup");
        this.handler = new ClientHandler(this, this.channelGroup);
        ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("byteCounter", new ByteCounter("clientByteCounter"));
                pipeline.addLast("encoder", Encoder.getInstance());
                pipeline.addLast("decoder", new Decoder());
                pipeline.addLast("handler", handler);
                return pipeline;
            }
        };

        ClientBootstrap bootstrap = new ClientBootstrap(this.clientFactory);
        bootstrap.setOption("reuseAddress", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", true);
        bootstrap.setPipelineFactory(pipelineFactory);

        boolean connected = bootstrap.connect(new InetSocketAddress(host, port)).awaitUninterruptibly().isSuccess();
        System.out.println("isConnected: " + connected);
        if (!connected) {
            this.stop();
        }

        return connected;
    }

    public void stop() {
        if (this.channelGroup != null) {
            this.channelGroup.close();
        }
        if (this.clientFactory != null) {
            this.clientFactory.releaseExternalResources();
        }
    }

    public ChannelFuture sendMessage(Envelope env) {
        Channel ch = this.channelGroup.iterator().next();
        ChannelFuture cf = ch.write(env);
        return cf;
    }

    private void flood() {
        if ((this.channelGroup == null) || (this.clientFactory == null)) {
            return;
        }

        System.out.println("sending " + this.messages + " messages");
        this.startTime = System.currentTimeMillis();
        for (int i = 0; i < this.messages; i++) {

            this.handler.sendMessage(new Envelope(Version.VERSION1, Type.REQUEST, 1, new byte[1]));
        }
    }
    // main -----------------------------------------------------------------------------------------------------------

    public static void main(String[] args) throws InterruptedException {
        final Client client = new Client("localhost", 9999, 10000);

        if (!client.start()) {
            System.exit(-1);
            return;
        }
        while (client.channelGroup.size() == 0) {
            Thread.sleep(200);
        }
        System.out.println("Client started...");

        client.flood();


        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("shutting down client");
                client.stop();
            }
        });


    }
}

客户处理程序

public class ClientHandler extends SimpleChannelUpstreamHandler {
    // internal vars --------------------------------------------------------------------------------------------------
    private final ClientHandlerListener listener;
    private final ChannelGroup channelGroup;
    private Channel channel;

    // constructors ---------------------------------------------------------------------------------------------------
    public ClientHandler(ClientHandlerListener listener, ChannelGroup channelGroup) {
        this.listener = listener;
        this.channelGroup = channelGroup;
    }

    // SimpleChannelUpstreamHandler -----------------------------------------------------------------------------------

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        if (e.getMessage() instanceof Envelope) {
            Envelope env = (Envelope) e.getMessage();
            this.listener.messageReceived(env);
        } else {
            System.out.println("NOT ENVELOPE!!");
            super.messageReceived(ctx, e);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        System.out.println("**** CAUGHT EXCEPTION CLOSING CHANNEL ***");
        e.getCause().printStackTrace();
        e.getChannel().close();
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.channel = e.getChannel();
        System.out.println("Server connected, channel id: " + this.channel.getId());
        this.channelGroup.add(e.getChannel());
    }

    // public methods -------------------------------------------------------------------------------------------------
    public void sendMessage(Envelope envelope) {
        if (this.channel != null) {
            this.channel.write(envelope);
        }
    }
}

客户端处理程序监听器接口(interface)

public interface ClientHandlerListener {

    void messageReceived(Envelope message);
}

最佳答案

如果不知道网络上的信封有多大,我会猜测您的问题是您的客户端写入 10,000 条消息而不检查 channel 是否可写。

Netty 3.x 处理网络事件并以特定方式写入。您的客户端可能会以如此快的速度写入如此多的数据,以至于 Netty 没有机会处理接收事件。在服务器端,这将导致 channel 变得不可写并且您的处理程序丢弃回复。

您在本地主机上看到问题的原因有多种,但这可能是因为写入带宽远高于您的网络带宽。客户端不会检查 channel 是否可写,因此通过网络,您的消息将由 Netty 缓冲,直到网络能够跟上(如果您写入的消息明显超过 10,000 条,您可能会看到 OutOfMemoryError)。这是一个自然中断,因为 Netty 将暂停写入,直到网络准备好,从而允许它处理传入数据并防止服务器看到不可写的 channel 。

DiscardClientHandler丢弃处理程序中显示了如何测试 channel 是否可写,以及如何在 channel 再次可写时恢复。另一种选择是让 sendMessage 返回与写入关联的 ChannelFuture,如果写入后 channel 不可写,则阻塞直到 future 完成。

您的服务器处理程序还应该写入消息,然后检查 channel 是否可写。如果不是,您应该将 channel 可读设置为 false。当 channel 再次变为可写时,Netty 将通知 ChannelInterestChanged。然后您可以将channel Readable 设置为true 以继续读取消息。

关于java - Netty客户端有时收不到所有预期的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12763800/

相关文章:

java - Netty 客户端的 MVC 前端

java - eclipse R.java 无法打开类文件

java - JPA 仅选择每个项目的最近日期

Java - 连接到亚马逊

linux - Ubuntu : Reset default routing table

java - 无法从类 io.netty.channel.sctp.nio.NioSctpChannel 创建 channel

java - 无法在 Spring Webflux 2.1.0.RELEASE 中启动 Netty 服务器

java - 包装 UI 组件的模式

java - 客户端/服务器设计模式

networking - 每个 IP 地址限制 1 票?