java - Netty 4 - 池返回一个尚未准备好发送实际消息的 channel

标签 java tcp netty

我已经创建了一个 SimpleChannelInboundHandler 类型的入站处理程序并添加到管道中。我的意图是每次建立连接时,我想发送一条称为 session 打开消息的应用程序消息,并使连接准备好发送实际消息。为此,上面的入站处理程序 越过发送 session 打开消息的 channelActive(),作为响应,我会收到一条 session 打开确认消息。只有在那之后,我才能发送任意数量的实际业务消息。我正在使用 FixedChannelPool 并按如下方式初始化。这在启动时效果很好。但是如果远程主机关闭连接,之后如果调用下面的 sendMessage() 发送消息,则该消息甚至在 session 打开消息之前通过 channelActive() 发送并获得其响应。因此服务器会忽略该消息,因为在发送业务消息时 session 尚未打开。

我正在寻找的是,池应该只返回那些已经调用 channelActive() 事件的 channel ,这些 channel 已经发送了 session 打开消息并且它已经从服务器获得了 session 打开确认消息。如何处理这种情况?

public class SessionHandler extends SimpleChannelInboundHandler<byte[]> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        if (ctx.channel().isWritable()) {
            ctx.channel().writeAndFlush("open session message".getBytes()).;
        }
    }
}

// At the time of loading the applicaiton
public void init() {
    final Bootstrap bootStrap = new Bootstrap();
    bootStrap.group(group).channel(NioSocketChannel.class).remoteAddress(hostname, port);
    fixedPool = new FixedChannelPool(bootStrap, getChannelHandler(), 5);

    // This is done to intialise connection and the channelActive() from above handler is invoked to keep the session open on startup
    for (int i = 0; i < config.getMaxConnections(); i++) {
        fixedPool.acquire().addListener(new FutureListener<Channel>() {
            @Override
            public void operationComplete(Future<Channel> future) throws Exception {
                if (future.isSuccess()) {

                } else {
                    LOGGER.error(" Channel initialzation failed...>>", future.cause());
                }

            }
        });
    }
}

//应用程序调用以下方法实际发送消息。

public void sendMessage(final String businessMessage) {
    fixedPool.acquire().addListener(new FutureListener<Channel>() {
        @Override
        public void operationComplete(Future<Channel> future) throws Exception {
            if (future.isSuccess()) {
                Channel channel = future.get();
                if (channel.isOpen() && channel.isActive() && channel.isWritable()) {
                    channel.writeAndFlush(businessMessage).addListener(new GenericFutureListener<ChannelFuture>() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isSuccess()) {
                                // success msg
                            } else {
                                // failure msg
                            }
                        }
                    });
                    fixedPool.release(channel);
                }
            } else {
                // Failure
            }
        }
    });
}

最佳答案

如果没有特定原因需要使用 FixedChannelPool,那么您可以使用其他数据结构(List/Map)来存储 Channels。您可以在发送打开 session 消息后向数据结构添加一个 channel ,并在 channelInactive 方法中将其删除。

如果您需要对 channel 执行批量操作,您可以使用 ChannelGroup为了这个目的。

如果您仍想使用FixedChannelPool,您可以在 channel 中设置一个关于是否发送打开消息的属性:

ctx.channel().attr(OPEN_MESSAGE_SENT).set(true);

您可以在您的sendMessage 函数中获取如下属性:

boolean sent = ctx.channel().attr(OPEN_MESSAGE_SENT).get();

并且在 channelInactive 中,您可以将其设置为 false 或将其删除。

注意 OPEN_MESSAGE_SENT 是一个 AttributeKey:

public static final AttributeKey<Boolean> OPEN_MESSAGE_SENT = AttributeKey.valueOf("OPEN_MESSAGE_SENT");

关于java - Netty 4 - 池返回一个尚未准备好发送实际消息的 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53459500/

相关文章:

C++:继承 std::basic_streambuf 的问题

java - Netty 帧解码器在多条消息上损坏

java - Netty 中是否有类似 transferFrom 的零拷贝功能?

java - Netty 4.1 还需要 setUseClientMode 吗?

java - 无法在我的 JPanel 中绘制图像

linux - TCP RST包能减少连接超时吗?

java - 在 Android 中放置应用内更新实现的位置

sockets - 为什么 FreeBSD 获取 TCP 套接字选项需要锁定写锁?

java - 均匀间隔的刻度标记

java - docker环境下spring boot外部配置: application. yml不存在