java - Netty解码错误

标签 java networking netty

我正在尝试用数据包编写一个简单的 TCP 服务器/客户端系统。当客户端变为 Activity 状态时,我让它向服务器发送 1 个数据包,服务器很好地接收它,但随后客户端抛出异常(如下所示),我不确定为什么。客户端不应收到任何返回的数据。

客户端

    public class Client {

    public static void main(String[] args) throws Exception {
        new Client("localhost", 8000);
    }

    private Channel channel;

    public Client(final String host, final int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            new Bootstrap()
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast("decoder", new PacketDecoder());
                    ch.pipeline().addLast("encoder", new PacketEncoder());
                    ch.pipeline().addLast(new Handler());
                }
            })
            .connect(host, port).sync().channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    private class Handler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            channel = ctx.channel();
            System.out.println("Connected");
            //channel.writeAndFlush(new SimplePacket(25));
            channel.writeAndFlush(new SimplePacket(50));
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
            System.out.println("Disconnected");
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {       
            Packet packet = (Packet) msg;
            System.out.println("Received packet: " + packet.getId() + " | " + packet.toString());
        }

        @Override
        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }

    }

}

服务器

    public class Server {

    public static void main(String[] args) throws Exception {
        new Server(8000);
    }

    private final Set<Channel> channels = new HashSet<Channel>();

    public Server(final int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap()
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast("decoder", new PacketDecoder());
                    ch.pipeline().addLast("encoder", new PacketEncoder());
                    ch.pipeline().addLast(new Handler());
                }
            });         
            b.bind(port)
            .addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {           
                        System.out.println("Listening on " + port);
                        } else {
                        System.out.println("Could not bind to host");
                    }
                }
            })
            .sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class Handler extends ChannelInboundHandlerAdapter {

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            channels.add(ctx.channel());
            System.out.println("Client connected [" + channels.size() + "]: " + ctx.channel().remoteAddress()); 
        }

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            channels.remove(ctx.channel());
            System.out.println("Client disconnected [" + channels.size() + "]: " + ctx.channel().remoteAddress());
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            Packet packet = (Packet) msg;
            System.out.println("Received packet: " + packet.getId() + " | " + packet.toString());
        }

        @Override
        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }

    }

}

数据包解码器

public class PacketDecoder extends ByteToMessageDecoder {

    private final PacketManager packetManager;

    public PacketDecoder(PacketManager packetManager) {
        this.packetManager = packetManager;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {  


if (buf.readableBytes() < 4) {
            return;
        }
        while (buf.readableBytes() != 0) {
            int id = buf.readInt();
            if (buf.readableBytes() != 0) { 
                if (packetManager.isRegistered(id)) {
                    Packet packet = packetManager.getPacket(id);
                    packet.read(buf);
                    out.add(packet);
                    } else {
                    buf.skipBytes(buf.readableBytes());
                    throw new DataException("Cannot receive unregistered packet: " + id);
                }
            }
        }
    }

}

数据包编码器

    public class PacketEncoder extends MessageToByteEncoder<Packet> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf buf) throws Exception {
        buf.writeInt(packet.getId());
        packet.write(buf);
    }

}

SimplePacket 类

public class SimplePacket extends Packet {

    private int data;

    public SimplePacket(int data) {
        this.data = data;
    }

    public SimplePacket() {

    }

    @Override
    public void read(ByteBuf buf) {
        data = buf.readInt();
    }   

    @Override
    public void write(ByteBuf buf) {
        buf.writeInt(data);
    }

    @Override
    public int getId() {
        return 1000;
    }

    @Override
    public String toString() {
        return "{" + data + "}";
    }

}

异常

io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(78) + length(4) exceeds writerIndex(80): UnpooledUnsafeDirectByteBuf(ridx: 78, widx: 80, cap: 80)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:257)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:139)
    at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
    at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
    at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(78) + length(4) exceeds writerIndex(80): UnpooledUnsafeDirectByteBuf(ridx: 78, widx: 80, cap: 80)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1161)
    at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:612)
    at dataserver.packet.Packet.readString(Packet.java:21)
    at dataserver.packet.packets.SequencePacket.read(SequencePacket.java:74)
    at dataserver.packet.codec.PacketDecoder.decode(PacketDecoder.java:27)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:226)
    ... 10 more
io.netty.handler.codec.DecoderException: dataserver.DataException: Cannot receive unregistered packet: 2
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:257)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:139)
    at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
    at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
    at java.lang.Thread.run(Thread.java:724)
Caused by: dataserver.DataException: Cannot receive unregistered packet: 2
    at dataserver.packet.codec.PacketDecoder.decode(PacketDecoder.java:31)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:226)
    ... 10 more

我正在使用的新数据包(数据大小可能有所不同)

public class SequencePacket extends Packet {

private static final Map<Character, Class<? extends Object>> types = new HashMap<Character, Class<? extends Object>>();

static {
    types.put('b', Byte.class);
    types.put('f', Float.class);
    types.put('d', Double.class);
    types.put('s', Short.class);
    types.put('i', Integer.class);
    types.put('l', Long.class);
    types.put('c', Character.class);
    types.put('S', String.class);
    types.put('B', Boolean.class);
}

private final List<Object> data = new ArrayList<Object>();

public SequencePacket() {

}

public SequencePacket(Object...objects) {
    for (Object object : objects) {
        write(object);
    }
}

@Override
public void read(ByteBuf buf) {
    String sequence = Packet.readString(buf).trim();
    System.out.println("Sequence: " + sequence);
    char[] split = sequence.toCharArray();
    for (int i = 0; i < split.length; i++) {
        char c = split[i];
        if (!types.containsKey(c)) {
            throw new DataException("Bad sequence character in " + sequence + ": " + c);
        }
        switch (c) {
            case 'b':
                data.add(buf.readByte());
                break;
            case 'f':
                data.add(buf.readFloat());
                break;
            case 'd':
                data.add(buf.readDouble());
                break;
            case 's':
                data.add(buf.readShort());
                break;
            case 'i':
                data.add(buf.readInt());
                break;
            case 'l':
                data.add(buf.readLong());
                break;
            case 'c':
                data.add(buf.readChar());
                break;
            case 'S':
                data.add(Packet.readString(buf));
                break;
            case 'B':
                data.add(buf.readBoolean());
                break;
        }
    }
}

@Override
public void write(ByteBuf buf) {
    StringBuilder sequence = new StringBuilder();
    for (Object object : data) {
        sequence.append(getType(object.getClass()));
    }
    Packet.writeString(buf, sequence.toString());
    for (Object object : data) {
        switch (getType(object.getClass())) {
            case 'b':
                buf.writeByte((Byte) object);
                break;
            case 'f':
                buf.writeFloat((Float) object);
                break;
            case 'd':
                buf.writeDouble((Double) object);
                break;
            case 's':
                buf.writeShort((Short) object);
                break;
            case 'i':
                buf.writeInt((Integer) object);
                break;
            case 'l':
                buf.writeLong((Long) object);
                break;
            case 'c':
                buf.writeChar((Character) object);
                break;
            case 'S':
                Packet.writeString(buf, (String) object);
                break;
            case 'B':
                buf.writeBoolean((Boolean) object);
                break;
        }
    }
}

@Override
public int getId() {
    return 0;
}

public SequencePacket write(Object o) {
    if (!types.containsValue(o.getClass())) {
        throw new DataException("Cannot add object type to sequence: " + o.getClass().getSimpleName());
    }
    data.add(o);
    return this;
}

public byte getByte(int index) {
    return (Byte) data.get(index);
}

public float getFloat(int index) {
    return (Float) data.get(index);
}

public double getDouble(int index) {
    return (Double) data.get(index);
}

public short getShort(int index) {
    return (Short) data.get(index);
}

public int getInt(int index) {
    return (Integer) data.get(index);
}

public long getLong(int index) {
    return (Long) data.get(index);
}

public char getChar(int index) {
    return (Character) data.get(index);
}

public String getString(int index) {
    return data.get(index).toString();
}

public boolean getBoolean(int index) {
    return (Boolean) data.get(index);
}

public Object getObject(int index) {
    return data.get(index);
}

public boolean isByte(int index) {
    return data.get(index).getClass() == Byte.class;
}

public boolean isFloat(int index) {
    return data.get(index).getClass() == Float.class;
}

public boolean isDouble(int index) {
    return data.get(index).getClass() == Double.class;
}

public boolean isShort(int index) {
    return data.get(index).getClass() == Short.class;
}

public boolean isInt(int index) {
    return data.get(index).getClass() == Integer.class;
}

public boolean isLong(int index) {
    return data.get(index).getClass() == Long.class;
}

public boolean isChar(int index) {
    return data.get(index).getClass() == Character.class;
}

public boolean isString(int index) {
    return data.get(index).getClass() == String.class;
}

public boolean isBoolean(int index) {
    return data.get(index).getClass() == Boolean.class;
}

public List<Object> getAll() {
    return data;
}

public int size() {
    return data.size();
}

public boolean isEmpty() {
    return data.isEmpty();
}

public SequencePacket clear() {
    data.clear();
    return this;
}

public boolean hasIndex(int index) {
    return index >= 0 && index < data.size();
}

public Class<? extends Object> getClass(int index) {
    return data.get(index).getClass();
}

private char getType(Class<? extends Object> clazz) {
    char c = ' ';
    for (Entry<Character, Class<? extends Object>> entry : types.entrySet()) {
        if (entry.getValue() == clazz) {
            c = entry.getKey();
            break;
        }
    }
    if (c == ' ') {
        throw new DataException("Could not find type in sequence: " + clazz.getSimpleName());
    }
    return c;
}

@Override
public String toString() {
    StringBuilder result = new StringBuilder();
    result.append("{");
    if (data != null) {
        for (Object object : data) {
            result.append(object.toString());
            result.append(", ");
        }
        if (result.length() > 2) {
            result.setLength(result.length() - 2);
        }
    }
    result.append("}");
    return result.toString();
}

}

最佳答案

每当 channel 处于非 Activity 状态时,都会调用解码,这是从 ByteToMessageDecoder 继承的默认行为,要解决此问题,您必须检查 PacketDecoder.decode 中的空缓冲区,如果缓冲区为空,则简单地返回。

发生异常 2 是因为您的代码中的逻辑如下:

  1. 创建 SimplePacket
  2. 让 SimplePacket 从缓冲区读取一个 int
  3. 如果缓冲区中还有剩余数据,则抛出异常
  4. 如果没有剩余数据,请将数据包添加到出列表

由于您从客户端发送了两个 SimplePacket,因此肯定会抛出异常。要解决此问题,您必须在服务器中循环创建 SimplePacket:

while(buf.readableBytes() != 0){
    Packet packet = new SimplePacket();
    packet.read(buf);
    out.add(packet);
}

在 TCP 中,发送的数据包将被可靠地发送,并且将以相同的顺序接收,但不一定以与发送时相同的 block 大小,如果您不开始以这种方式思考,您将不断在代码中引入错误。我希望您最后一个异常(exception):)可能是由于您开始从下面的代码中的缓冲区读取字节而不管实际有多少字节可用而引起的。仅检查缓冲区是否有数据是不够的,您还需要检查缓冲区是否有足够的数据。在有足够的数据之前,您不会读取任何内容,只需让数据在缓冲区中累积,直到缓冲区的大小大于或等于协议(protocol)中可解析字节的最小数量。

while (buf.readableBytes() != 0) {
            int id = buf.readInt();
            if (packetManager.isRegistered(id)) {
                Packet packet = packetManager.getPacket(id);
                packet.read(buf);
                out.add(packet);
                } else {
                buf.skipBytes(buf.readableBytes());
                throw new DataException("Cannot receive unregistered packet: " + id);
            }
        }

关于java - Netty解码错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21214623/

相关文章:

Java日历日期错误

Azure-storage-blob - 目标服务器域名是什么

kotlin - 一个 NettyServer 的 Avro RPC 多个响应程序

kotlin - 未调用 ktor 中的应用程序级事件

java - 在二十一点中提问时

java - 如何用图像中的蓝色半透明圆替换我的 Android 应用 Google map 半径圆?

list - SwiftUI 异步数据获取

http - Netty http 流水线方法验证

java - 使用 netbeans 和 maven 创建 JApplet

linux - 终止流氓 "tor"进程