java - Netty客户端只能接收服务器的一次响应

标签 java netty

最近,我用netty 4写了一个客户端和服务器,客户端向服务器发送了5条消息,但客户端只能收到服务器的一条响应,这是我的代码。

DTO:RpcRequest

package com.huawei.simple.rpc.base.request.dto;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

import java.lang.reflect.Parameter;

@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class RpcRequest {
    private String requestId;
    private String interfaceName;
    private String methodName;
}

DTO:RpcResponse

package com.huawei.simple.rpc.base.response.dto;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class RpcResponse {
    private String requestId;
    private String responseMessage;
    private int responseCode;
    private Object responseBody;
}

编解码器

public class ResponseDecoder extends ByteToMessageDecoder {
    private static ObjectMapper mapper = new ObjectMapper();
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        byteBuf.readBytes(data);
        RpcResponse response = mapper.readValue(data, RpcResponse.class);
        list.add(response);
    }
}
public class ResponseEncoder extends MessageToByteEncoder<RpcResponse> {
    private static ObjectMapper mapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse, ByteBuf byteBuf) throws Exception {
        byte[] bytes = mapper.writeValueAsBytes(rpcResponse);
        int dataLength = bytes.length;
        byteBuf.writeInt(dataLength);
        byteBuf.writeBytes(bytes);
        byteBuf.writeBytes(SerializationUtil.serialize(rpcResponse));

    }
}
public class RequestDecoder extends ByteToMessageDecoder {
    private static ObjectMapper mapper = new ObjectMapper();

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        byteBuf.readBytes(data);
        RpcRequest request = mapper.readValue(data, RpcRequest.class);
        list.add(request);
    }
}
public class RequestEncoder extends MessageToByteEncoder<RpcRequest> {
    private static ObjectMapper mapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest, ByteBuf byteBuf) throws Exception {
        byte[] bytes = mapper.writeValueAsBytes(rpcRequest);
        int dataLength = bytes.length;
        byteBuf.writeInt(dataLength);
        byteBuf.writeBytes(bytes);
    }
}

客户端

public class RpcClient {
    public static void main(String[] args) throws Exception {
        RpcClient server = new RpcClient();
        server.build();
    }

    public void build() {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        Bootstrap s = new Bootstrap();
        s.group(boss).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY,true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new RequestEncoder());
                    socketChannel.pipeline().addLast(new ResponseDecoder());
                    socketChannel.pipeline().addLast(new RpcClientHandler());
                }
            });
        try {
            ChannelFuture future = s.connect(new InetSocketAddress("localhost",8080)).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
        }
    }
}
public class RpcClientHandler extends ChannelInboundHandlerAdapter {
    private static AtomicInteger id = new AtomicInteger(1);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 5; i++) {
            RpcRequest request = new RpcRequest();
            request.setRequestId(id.getAndIncrement() + "");
            request.setMethodName("helloWorld");
            request.setInterfaceName("interface");
            ChannelFuture future = ctx.writeAndFlush(request);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        System.out.println("Success");
                    }
                }
            });
            TimeUnit.SECONDS.sleep(5);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcResponse response = (RpcResponse) msg;
        System.out.println("get reponse from server " + response.toString());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

服务器

public class RpcServer {
    public static void main(String[] args) throws Exception {
        RpcServer server = new RpcServer();
        server.build();
    }

    public void build() {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap s = new ServerBootstrap();
        s.group(boss, worker).channel(NioServerSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_BACKLOG, 10)
            .handler(new LoggingHandler(LogLevel.DEBUG))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new RequestDecoder());
                    socketChannel.pipeline().addLast(new ResponseEncoder());
                    socketChannel.pipeline().addLast(new RpcServerHandler());
                }
            });
        try {
            ChannelFuture future = s.bind(new InetSocketAddress("localhost", 8080)).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}
public class RpcServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcRequest request = (RpcRequest) msg;
        System.out.println("get request from client " + request.toString());
        RpcResponse response = new RpcResponse();
        response.setRequestId(request.getRequestId());
        ChannelFuture future = ctx.writeAndFlush(response);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isDone()) {
                    System.out.println("Done");
                } else if (channelFuture.isSuccess()) {
                    System.out.println("Success");
                }
            }
        });
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

输出是 服务器:

  1. 从客户端获取请求 RpcRequest(requestId=1, interfaceName=interface, methodName=helloWorld) 完成
  2. 从客户端获取请求 RpcRequest(requestId=2, interfaceName=interface, methodName=helloWorld) 完成
  3. 从客户端获取请求 RpcRequest(requestId=3, interfaceName=interface, methodName=helloWorld) 完成
  4. 从客户端获取请求 RpcRequest(requestId=4, interfaceName=interface, methodName=helloWorld) 完成
  5. 从客户端获取请求 RpcRequest(requestId=5, interfaceName=interface, methodName=helloWorld) 完成

客户端

  1. 成功
  2. 成功
  3. 成功
  4. 成功
  5. 成功
  6. 从服务器获取响应 RpcResponse(requestId=1, 响应消息=空,响应代码= 0,响应正文=空)

为什么我的客户端只从服务器收到一个响应?

最佳答案

您要么在 ResponseEncoder 内犯了错误,要么在 ResponseDecoder 内犯了错误(我猜是第一个。

public class ResponseDecoder extends ByteToMessageDecoder {
    private static ObjectMapper mapper = new ObjectMapper();
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        byteBuf.readBytes(data);
        RpcResponse response = mapper.readValue(data, RpcResponse.class);
        list.add(response);
    }
}
public class ResponseEncoder extends MessageToByteEncoder<RpcResponse> {
    private static ObjectMapper mapper = new ObjectMapper();
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse, ByteBuf byteBuf) throws Exception {
        byte[] bytes = mapper.writeValueAsBytes(rpcResponse);
        int dataLength = bytes.length;
        byteBuf.writeInt(dataLength);
        byteBuf.writeBytes(bytes);
        byteBuf.writeBytes(SerializationUtil.serialize(rpcResponse));

    }
}

您的编码器将响应写入为:

int: length of bytes
bytes: The result of the mapper
serialize: the result of the serialize operation

但是这些在解码器内部被读取为:

int: length of bytes
bytes: The result of the mapper

您要么忘记处理解码器中的额外字节,要么发送大量数据。

这会导致程序无法读取,因为客户端实际上正在等待更多数据,因为我假设“序列化”的数据不是以零开头,从而导致解码器内的长度值异常高。

关于java - Netty客户端只能接收服务器的一次响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49942522/

相关文章:

java - 从其他客户端解密 RSA 时发生 BadPaddingException

websocket - Netty websocket服务器高可用性

java - Applet + Netty 4.0 异常初始化错误

netty - 在netty中,我们只能写入和接收小于1024bytes的数据 : how we can write or receive more?

java - Netty - 不能用一个 CookieEncoder 设置多个 cookie

java - Linux编译Java项目

java - Espresso 图像选择器 - CursorIndexOutOfBoundsException 错误

java - 将 ExpandableListView 放在使用操作栏创建的选项卡 fragment 下?

java - 线程 Java 中断

java - 内蒂。 bootstrap.connect 成功后的 channel 是否为 Activity channel ?