java - Netty ByteToMessageDecoder 运行的次数超出了它必须的次数

标签 java server netty decode

我正在尝试使用 netty 编写一个简单的客户端服务器应用程序。 我关注了this教程,特别是时间服务器模型和 POJO 模型。我的问题是 ByteToMessageDecoder :它运行的次数比它必须的要多,这意味着当它读取空 ByteBuf 时它不会停止,而是会再读取一次,并且出于某种我无法理解的原因,它会找到我的客户端的上一条消息发送!我确信客户端只发送一次所述消息!

所以想法是这样的:一个简单的客户端服务器模型,其中客户端发送包含“hello world”消息的“DataPacket”,服务器使用带有“ACK”的“DataPacket”进行响应。我使用 DataPacket 类型是因为将来我想在其中传递更多内容,添加 header 并构建一些更复杂的内容...但对于初学者来说,我需要看看我在这个类型中做错了什么...

错误: Error of my application

正如你所看到的,服务器正常启动,我的客户端(收发器)发送消息,编码器激活并将其从DataPacket转换为ByteBuf,消息从服务器发送和接收,来自服务器的解码器激活并转换它从 ByteBuf 到 DataPacket,然后服务器相应地处理它...它应该发送 ACK 并向后重复相同的操作,但这是出现问题的地方,我不明白为什么。

我在这里阅读了一些帖子,并且已经尝试了 LengthFieldBasedFrameDecoder,它不起作用,我也想看看这个如果可能的话有什么问题,而不是使用其他东西......

代码:

编码器和解码器类:

 package org.client_server;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;

public class EncoderDecoder {

    public static class NettyEncoder extends MessageToByteEncoder<DataPacket> {

        @Override
        protected void encode(ChannelHandlerContext ctx, DataPacket msg, ByteBuf out)
                throws Exception {
            System.out.println("Encode: "+msg.getData());
            out.writeBytes(msg.convertData());
        }       
    }

    public static class NettyDecoder extends ByteToMessageDecoder{

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                List<Object> out) throws Exception {
            if((in.readableBytes() < 4) ) {
                return;
            }
            String msg = in.toString(CharsetUtil.UTF_8);
            System.out.println("Decode:"+msg);

            out.add(new DataPacket(msg));
        }

    }

}

服务器处理程序:

class DataAvroHandler extends ChannelInboundHandlerAdapter {


        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            try {
                DataPacket in = (DataPacket)msg;
                System.out.println("[Server]: Message received..."+in.getData());
            }finally {
                ReferenceCountUtil.release(msg);
                //ctx.close();
            }
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx)
                throws Exception {
            System.out.println("[Server]: Read Complete...");
            DataPacket pkt = new DataPacket("ACK!");
            //pkt.setData(Unpooled.copiedBuffer("ACK", CharsetUtil.UTF_8));
            ctx.writeAndFlush(pkt);         
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            serverLog.warning("[Server]: Error..." + cause.toString());
            ctx.close();            
        }

客户端处理程序:

class DataAvroHandlerCl extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("[Transceiver]: Channel Active!!!");
        DataPacket pkt = new DataPacket("Hello World!");
        ChannelFuture f = ctx.writeAndFlush(pkt);
        //f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            DataPacket in = (DataPacket)msg;
            System.out.println("[Transceiver]: Message received..."+in.getData());
        }finally {
            ReferenceCountUtil.release(msg);
            //ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        transLog.warning("[Transceiver] : Error..." + cause.getMessage());
        ctx.close();
    }

}

服务器和客户端管道:

ch.pipeline().addLast("Decoder", new EncoderDecoder.NettyDecoder());
ch.pipeline().addLast("Encoder", new EncoderDecoder.NettyEncoder());
ch.pipeline().addLast("DataAvroHandler", new DataAvroHandler());

最佳答案

您的问题是由于在 NettyDecoder 中使用 ByteBuf intoString() 方法引起的。

引用 javadoc (http://netty.io/4.0/api/io/netty/buffer/ByteBuf.html#toString%28java.nio.charset.Charset%29):

This method does not modify readerIndex or writerIndex of this buffer.

现在,ByteToMessageDecoder 不知道您实际解码了多少字节!看起来您解码了 0 个字节,因为缓冲区的 readerIndex 未修改,因此您还会在控制台中收到错误消息。

您必须手动修改 readerIndex:

String msg = in.toString(CharsetUtil.UTF_8);
in.readerIndex(in.readerIndex() + in.readableBytes());
System.out.println("Decode:"+msg);

关于java - Netty ByteToMessageDecoder 运行的次数超出了它必须的次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34047446/

相关文章:

c - 简单的 TCP 服务器无法输出到 Web 浏览器

java - Netty中的UDP传输是否可以使用多个ChannelPipelineFactory

scala - 在Scala中使用Netty

java - 如何解决 "stale element reference: element is not attached to the page document when navigating to the previous page"

java - 如何使用二进制数技术对整数的数字进行排序?

java - 线程中出现异常 "AWT-EventQueue-0"java.lang.NullPointerException 错误,该怎么办?

python - 从 Golang 调用 Python 任务

java - Android Studio 2.2 Preview 3布局错误

php - 如何监听服务器上的新事件

java-8 - Java 检查平台是否支持 sctp,否则使用 tcp